In [0]:
from pyspark.sql.functions import round, lit, avg

# Declaring variables for mounting Azure Blob
container_name = "container_name"
storage_account_name = "storage_account_name"
storage_account_access_key = "storage_account_key"
url = "wasbs://" + container_name + "@" + storage_account_name + ".blob.core.windows.net"
config = "fs.azure.account.key."+ storage_account_name + ".blob.core.windows.net"

In [0]:
# Mounting azure storage
dbutils.fs.mount(
    source = url,
    mount_point = "/mnt/speedAnalysis",
    extra_configs = {config:storage_account_access_key}
)

In [0]:
# Defining schema for DF
from pyspark.sql.types import *
schema = StructType([ StructField("operator", StringType(), False),
                     StructField("speed_type", StringType(), False),
                     StructField("download_upload", StringType(), False),
                     StructField("speed_kbps", IntegerType(), False),
                     StructField("latency", IntegerType(), False),
                     StructField("state", StringType(), False)]
)

# Defining Schema for Final DF - Not Used
finalDF_schema = StructType([ StructField("operator", StringType(), False),
                     StructField("speed_type", StringType(), False),
                     StructField("download_upload", StringType(), False),
                     StructField("speed_kbps", IntegerType(), False),
                     StructField("latency", IntegerType(), False),
                     StructField("state", StringType(), False),
                     StructField("speed_mbps", StringType(), False),
                     StructField("month", StringType(), False),
                     StructField("year", StringType(), False)]
)
finalDF = sqlContext.createDataFrame(sc.emptyRDD(), finalDF_schema)

In [0]:
# Iterating through each file in the mounted directory

for file in dbutils.fs.ls("/mnt/speedAnalysis"):
    # Extracting file specific details
    file_path = file[0]
    month = [x for x in file[1].split()[0:2]][0]
    year = [x for x in file[1].split()[0:2]][1]
    print(f"Currently Processing file for {month} - {year}")
    
    # Reading csv file
    df = (spark.read.format("csv")
          .schema(schema)
          .load(file_path))
    
    # Removing null and NA values, calculation speed in MBPS
    cleaned_df = (df.filter(df.latency.isNotNull() & (df.state != 'NA') & (df.speed_kbps.isNotNull()))
                  .withColumn("speed_mbps", round(df.speed_kbps/1024,2))
                 )
    
    # Grouping all columns and aggregating numerical columns
    groupedDF = (cleaned_df.groupBy("operator", "speed_type", "download_upload","state")
             .agg(round(avg("speed_kbps"),2).alias("speed_kbps"), round( avg("latency"),2).alias("latency"), round(avg("speed_mbps"),2).alias("speed_mbps"))
             .orderBy("operator", "speed_type","state")
             .withColumn("month", lit(month))
             .withColumn("year", lit(year))
                )
    
    # Writing DF to csv file in append mode 
    groupedDF.write.format("csv").mode("append").save("/mnt/speedAnalysis/final/speed.csv")

In [0]:
df2 = (spark.read.format("csv")
      .load("dbfs:/mnt/speedAnalysis/final/speed.csv"))
df2.show()

In [0]:
df2.filter(df2._c7 == "Jan").show()

In [0]:
df2.filter(df2._c7 == "December").count()

In [0]:
dbutils.fs.unmount("/mnt/speedAnalysis")

In [0]:
df2.create 

In [0]:
df2.write.format("csv").mode("overwrite").save("/FileStore/my-stuff/speed3.csv")

In [0]:
df2.coalesce(1).write.format("csv").save("/FileStore/my-stuff/speedFinal.csv")