In [0]:
# Mounting the storage
# Using the wasbs scheme for Azure Blob Storage
storage_account_name = "*******"
storage_account_key = "***********************************************"

# Set Spark configuration for account key
spark.conf.set(
  f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
  storage_account_key
)

mount_point = "/mnt/bixidata"

# Mount the storage account
mount_points = dbutils.fs.mounts()
is_mounted = any(mount.mountPoint == mount_point for mount in mount_points)

if not is_mounted:
    dbutils.fs.mount(
        source=f"wasbs://bixi-data@{storage_account_name}.blob.core.windows.net",
        mount_point=mount_point,
        extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
    )




In [0]:
# Verify the mount
display(dbutils.fs.ls("/mnt/bixidata"))

path,name,size,modificationTime
dbfs:/mnt/bixidata/live-data/,live-data/,0,0
dbfs:/mnt/bixidata/transformed-data/,transformed-data/,0,0


In [0]:
# List the files in the directory
status_files = dbutils.fs.ls("/mnt/bixidata/live-data/station_status.json")
info_files = dbutils.fs.ls("/mnt/bixidata/live-data/station_information.json")

# Extract the paths from the file information
status_file_paths = [file.path for file in status_files]
info_file_paths = [file.path for file in info_files]

# Import the col and explode 
from pyspark.sql.functions import col, explode

# Read the JSON files
station_status_raw_df = spark.read.json(status_file_paths).withColumn("last_updated", col("last_updated"))
station_info_raw_df = spark.read.json(info_file_paths).withColumn("last_updated", col("last_updated"))

# Extract the stations data
station_status_df = station_status_raw_df.selectExpr("last_updated", "explode(data.stations) as station")
station_info_df = station_info_raw_df.selectExpr("last_updated", "explode(data.stations) as station")


# Select the relevant fields from the exploded data
station_status_df = station_status_df.select("last_updated", "station.*")
station_info_df = station_info_df.select("last_updated", "station.*")

# Select relevant columns 
station_status_df = station_status_df.select(
    "last_updated",
    "station_id", 
    "num_bikes_available", 
    "num_ebikes_available", 
    "num_bikes_disabled", 
    "num_docks_available", 
    "num_docks_disabled",
    "is_charging", 
    "is_installed", 
    "is_renting", 
    "is_returning", 
    "last_reported"
)

station_info_df = station_info_df.select(
    "station_id",
    "name", 
    "lat", 
    "lon", 
    "capacity"
)



In [0]:
station_info_df.show()



+----------+--------------------+------------------+------------------+--------+
|station_id|                name|               lat|               lon|capacity|
+----------+--------------------+------------------+------------------+--------+
|         1|Métro Champ-de-Ma...| 45.50956143036354|-73.55743234369584|      19|
|         2|Ste-Catherine / D...|45.539385081961676|-73.54099988937377|      26|
|         3|       Clark / Evans|45.511105776623936|-73.56784012175922|      19|
|         4|du Champ-de-Mars ...| 45.50965520472071|-73.55400860309601|      23|
|         5|  Brittany / Ainsley| 45.52588991809354|-73.65003436803818|      19|
|         6|    Ann / Wellington|45.494520387544334|-73.55676591396332|      15|
|         7|de l'Hôtel-de-Vil...| 45.51166045593874|-73.56213569641113|      31|
|         8|Ste-Catherine / S...|45.512936177874096|-73.56124520301817|      39|
|         9|Crescent / de Mai...| 45.49812041247333|-73.57759594917297|      23|
|        10|de Grosbois / Du

In [0]:
# Merge DataFrames on station_id
merged_df = station_status_df.join(station_info_df, on="station_id" ,how="inner")

In [0]:
merged_df.show()

+----------+------------+-------------------+--------------------+------------------+-------------------+------------------+-----------+------------+----------+------------+-------------+--------------------+------------------+------------------+--------+
|station_id|last_updated|num_bikes_available|num_ebikes_available|num_bikes_disabled|num_docks_available|num_docks_disabled|is_charging|is_installed|is_renting|is_returning|last_reported|                name|               lat|               lon|capacity|
+----------+------------+-------------------+--------------------+------------------+-------------------+------------------+-----------+------------+----------+------------+-------------+--------------------+------------------+------------------+--------+
|         1|  1720709470|                 14|                   2|                 0|                  5|                 0|      false|           1|         1|           1|   1720708234|Métro Champ-de-Ma...| 45.50956143036354|-73.5

In [0]:
display(dbutils.fs.ls("/mnt/bixidata/transformed-data"))

path,name,size,modificationTime
dbfs:/mnt/bixidata/transformed-data/bixi_station_data.csv,bixi_station_data.csv,299832,1720708664000


In [0]:
# Import pandas for saving as csv
import pandas as pd
import os

# Convert Spark DF to Pandas DF
pandas_df = merged_df.toPandas()

# Define the file path
output_file = "/dbfs/mnt/bixidata/transformed-data/bixi_station_data.csv"
print(output_file)

# Data Cleaning
if os.path.exists(output_file):
    df = pd.read_csv(output_file)
    df = df.drop_duplicates()
    df.to_csv(output_file, index=False)
pandas_df = pandas_df.drop_duplicates()



# Check if the file exists
if os.path.exists(output_file):
    # If the file exists, read the existing data and append
    existing_df = pd.read_csv(output_file)
    combined_df = pd.concat([existing_df, pandas_df], ignore_index=True)
else:
    # If the file does not exist, create the csv
    combined_df = pandas_df

# Save the combined DataFrame to CSV
combined_df.to_csv(output_file, index=False)




/dbfs/mnt/bixidata/transformed-data/bixi_station_data.csv
