In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
spark = SparkSession.builder.appName("Day1_1").master("local").config('spark.executor.instances', 8).getOrCreate()
# spark.conf.set('spark.executor.instances',8)
# Azure Storage Account Name
storage_account_name = "**"

# Azure Storage Account Key
storage_account_key = "**"

# Azure Storage Account Source Container
container = "data"

# Set the configuration details to read/write
spark.conf.set("fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account_name), storage_account_key)

In [0]:
file_location = "wasb://data@rajdep561.blob.core.windows.net/effects-of-covid-19-on-trade-at-15-december-2021-provisional.csv"
df = spark.read.format('csv').option('header','true').load(file_location)
df = df.withColumn('Value',df['Value'].cast('int'))
df = df.withColumn('Cumulative',df['Cumulative'].cast('int'))
df.show()

+---------+----+----------+---------+-------+---------+--------------+-------+---------+----------+
|Direction|Year|      Date|  Weekday|Country|Commodity|Transport_Mode|Measure|    Value|Cumulative|
+---------+----+----------+---------+-------+---------+--------------+-------+---------+----------+
|  Exports|2015|01/01/2015| Thursday|    All|      All|           All|      $|104000000| 104000000|
|  Exports|2015|02/01/2015|   Friday|    All|      All|           All|      $| 96000000| 200000000|
|  Exports|2015|03/01/2015| Saturday|    All|      All|           All|      $| 61000000| 262000000|
|  Exports|2015|04/01/2015|   Sunday|    All|      All|           All|      $| 74000000| 336000000|
|  Exports|2015|05/01/2015|   Monday|    All|      All|           All|      $|105000000| 442000000|
|  Exports|2015|06/01/2015|  Tuesday|    All|      All|           All|      $| 76000000| 518000000|
|  Exports|2015|07/01/2015|Wednesday|    All|      All|           All|      $| 59000000| 577000000|


In [0]:
measure ={'Tonnes':0,'$':1}
trans = {'All':0,'Sea':1,'Air':2}
direct = {'Imports':0,'Exports':1,'Reimports':2}
def mes_id(x):
    return measure[x]

def tm(x):
    return trans[x]

def direction(x):
    return direct[x]
cdf = udf(lambda x:mes_id(x)) 
tmudf = udf(lambda x:tm(x))
d_udf = udf(lambda x:direction(x))

dat = df.select('Country').distinct().toPandas().values
cd = {}
for i,k in enumerate(dat):
    cd[k[0]] = i


def country(x):
    return cd[x]
c_ = udf(lambda x:country(x))

dat1 = df.select('Commodity').distinct().toPandas().values
com = {}
for i,k in enumerate(dat1):
    com[k[0]] = i
def commodity(x):
    return com[x]
comm = udf(lambda x:commodity(x))


In [0]:
df = df.withColumn('measure_id',cdf(col('Measure')))
measure_df = df.select('Measure','measure_id').distinct()

measure_df.show()


df = df.withColumn('tm_id',tmudf(col('Transport_Mode')))
tm_df = df.select('Transport_Mode','tm_id').distinct()
tm_df.show()


df = df.withColumn('direction_id',d_udf(col('Direction')))
# df.select('Direction').distinct().show()
direction_df = df.select('Direction','direction_id').distinct()
direction_df.show()


df = df.withColumn('commodity_id',comm(col('Commodity')))
commodity_df = df.select('Commodity','commodity_id').distinct()
commodity_df.show()


df = df.withColumn('country_id',c_(col('Country')))
country_df = df.select('Country','country_id').distinct()
country_df.show()

+-------+----------+
|Measure|measure_id|
+-------+----------+
| Tonnes|         0|
|      $|         1|
+-------+----------+

+--------------+-----+
|Transport_Mode|tm_id|
+--------------+-----+
|           All|    0|
|           Air|    2|
|           Sea|    1|
+--------------+-----+

+---------+------------+
|Direction|direction_id|
+---------+------------+
|Reimports|           2|
|  Imports|           0|
|  Exports|           1|
+---------+------------+

+--------------------+------------+
|           Commodity|commodity_id|
+--------------------+------------+
|Electrical machin...|           2|
|Meat and edible o...|           5|
|                 All|           1|
|               Fruit|           3|
|Non-food manufact...|           8|
|Mechanical machin...|           4|
|Logs, wood, and w...|           6|
|Fish, crustaceans...|           0|
|Milk powder, butt...|           7|
+--------------------+------------+

+--------------------+----------+
|             Country|country_id

In [0]:
cols = ("Country","Direction","Transport_Mode",'Measure','Commodity')

df = df.drop(*cols)

In [0]:
country_df.write\
    .mode("overwrite").format("jdbc")\
    .option("url", f"jdbc:sqlserver://rajdep561.database.windows.net:1433;database=eda;user=@rajdep561;password=;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;")\
    .option("dbtable", "country").save()

In [0]:
commodity_df.write\
    .mode("overwrite").format("jdbc")\
    .option("url", f"jdbc:sqlserver://rajdep561.database.windows.net:1433;database=eda;user=@rajdep561;password=;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;")\
    .option("dbtable", "commodity").save()

In [0]:
measure_df.write\
    .mode("overwrite").format("jdbc")\
    .option("url", f"jdbc:sqlserver://rajdep561.database.windows.net:1433;database=eda;user=@rajdep561;password=;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;")\
    .option("dbtable", "measure").save()

In [0]:
tm_df.write\
    .mode("overwrite").format("jdbc")\
    .option("url", f"jdbc:sqlserver://rajdep561.database.windows.net:1433;database=eda;user=@rajdep561;password=;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;")\
    .option("dbtable", "tm").save()

In [0]:
direction_df.write\
    .mode("overwrite").format("jdbc")\
    .option("url", f"jdbc:sqlserver://rajdep561.database.windows.net:1433;database=eda;@rajdep561;;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;")\
    .option("dbtable", "direction").save()

In [0]:
df.write\
    .mode("overwrite").format("jdbc")\
    .option("url", f"jdbc:sqlserver://rajdep561.database.windows.net:1433;database=eda;;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;")\
    .option("dbtable", "fact").save()