### 1.Load the cvs files
I firstly upload the cvs file to DBFS, then load them to each dataframe.

In [2]:
%python
# Load cust name
URL_cust_name = "/FileStore/tables/cust_names.csv"
df_cust_name = spark.read.format("csv").option("header", "true").option("inferSchema",  "true").load(URL_cust_name)
# display(df_cust_name.head(5))

# Load dcu loation
URL_dcu_location = "/FileStore/tables/dcu_locations.csv"
df_dcu_location = spark.read.format("csv").option("header", "true").option("inferSchema",  "true").load(URL_dcu_location)
# display(df_dcu_location.head(5))

# Load endpoint location
URL_endpoint_lat_long = "/FileStore/tables/endpoint_lat_long.csv"
df_endpoint_lat_long = spark.read.format("csv").option("header", "true").option("inferSchema",  "true").load(URL_endpoint_lat_long)
df_endpoint_lat_long.count()

# Load endpoint location
URL_trpkt = "/FileStore/tables/trpkt.csv"
df_trpkt = spark.read.format("csv").option("header", "true").option("inferSchema",  "true").load(URL_trpkt)
df_trpkt.count()


### 2. Filter unqualified data
Some "#VALUE!" and "#N/A" error records need to be deleted. Here I didn't export the error records since just for prototype.

In [4]:
# filter #VALUE!
df_trpkt = df_trpkt.filter(df_trpkt["payload_id"]!="#VALUE!")
df_endpoint_lat_long = df_endpoint_lat_long.filter(df_endpoint_lat_long["endpoint_id"]!="#N/A")
# here we should output the error row and persist them into files

### 3. Transfer fact table

3.1 transfer datetime to day

3.2 aggragation for number of trasmission

3.3 aggragation for number of reception

3.4 left join two tables

3.5 calc redundancy rate

3.6 get cust_id, endpoint geo information

3.7 duplicate table, get dcu geo information

In [6]:
from pyspark.sql.functions import substring
# transfter datetime to day
df_trpkt = df_trpkt.withColumn("date", substring("packet_datetime",1,10))


In [7]:
from pyspark.sql.functions import countDistinct
# number of transmission
df_number_transmission = df_trpkt.groupby('date','endpoint_id').agg(countDistinct("payload_id").alias("num_transmissions"))
# df_number_transmission.show()

# number of receptions
df_number_receptions = df_trpkt.groupby('date','endpoint_id','dcu_id').agg(countDistinct("payload_id").alias("num_receptions"))

# join two tables
df_result = df_number_receptions.join(df_number_transmission, ["date","endpoint_id"],how="left")

# redundancy rate
df_result = df_result.withColumn("redundancy_rate",df_result.num_receptions/df_result.num_transmissions)
df_result.count()

In [8]:
import pyspark.sql.functions as sf
from pyspark.sql.functions import col
# find cust_id by dcu_id
# df_result = df_result.withColumn("cust_id", sf.lit(582))
df_result = df_result.join(df_dcu_location,["dcu_id"],how="left").drop("longitude","latitude")

# add type
df_result = df_result.withColumn("type", sf.lit("endpoint"))

# add geo information and cust_id by endpoint_id
df_result = df_result.join(df_endpoint_lat_long,["cust_id","endpoint_id"], how="left")

# duplicate table
df_result2 = df_result.drop("type","longitude","latitude")\
            .withColumn("type", sf.lit("dcu"))\
            .join(df_dcu_location,["cust_id","dcu_id"], how="left")\
            .select("cust_id",col("dcu_id").alias("endpoint_id"),"date",col("endpoint_id").alias("dcu_id"),"num_receptions","num_transmissions","redundancy_rate","type","longitude","latitude")
df_result3 = df_result2.unionByName(df_result)
df_result3.show()

In [9]:
df_result3.count()

In [10]:

# persist file
df_result3.coalesce(1).write.format("com.databricks.spark.csv")\
   .mode("overwrite")\
   .option("header", "true")\
   .save("test_redundancy")

### 4. write result to mysql

In [12]:
username = ""
pwd = ""

connectionProperties = {
  "user" : Username,
  "password" : pwd,
  "driver" : "com.mysql.jdbc.Driver"}
 
  
url = "jdbc:mysql://:aclara-azure-dev-server.mysql.database.azure.com"
 
#destination database table 
table = "test_redundancy"
 
#write data from spark dataframe to database
df_result3.write.mode("append").jdbc(url, table, prop)