In [16]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [17]:
from pyspark.sql import SparkSession

In [18]:
spark = SparkSession.builder.appName('demo').master("local").enableHiveSupport().getOrCreate()
spark

In [19]:
#Specifying the schema instead of inferring it 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType, FloatType

inputSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True), 
                        StructField('weekday', StringType(),True),  
                        StructField('hour', IntegerType(),True),  
                        StructField('atm_status', StringType(),True),  
                        StructField('atm_id', StringType(),True),  
                        StructField('atm_manufacturer', StringType(),True),  
                        StructField('atm_location', StringType(),True),  
                        StructField('atm_streetname', StringType(),True),  
                        StructField('atm_street_number', IntegerType(),True),  
                        StructField('atm_zipcode', IntegerType(),True),  
                        StructField('atm_lat', FloatType(),True),  
                        StructField('atm_lon', FloatType(),True),  
                        StructField('currency', StringType(),True),  
                        StructField('card_type', StringType(),True),  
                        StructField('transction_amount', IntegerType(),True),  
                        StructField('service', StringType(),True),  
                        StructField('message_code', StringType(),True),  
                        StructField('message_text', StringType(),True),  
                        StructField('weather_lat', FloatType(),True),
                        StructField('weather_lon', FloatType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', FloatType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', FloatType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True)
                        ])

In [20]:
# Creating dataframe from the csv file and infering the schema
df = spark.read.load("/user/root/srcatm/part-m-00000", schema = inputSchema,format="csv", sep=",")


In [21]:
# Printing the schema 
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string 

In [22]:
# Showing the elements of the dataframe
df.show()

+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+-----------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|           card_type|transction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+--------

In [23]:
# Check the count
df.count()

2468572

In [24]:
from pyspark.sql.functions import concat,col, lit
from pyspark.sql.functions import lpad
from pyspark import SparkContext
import pyspark.sql 
from pyspark import sql
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark import SparkConf, SparkContext

sqlContext = sql.SQLContext(spark)




In [25]:
# Location Dimension 
atm_loc_df = df.select([df.atm_location.alias("location"), df.atm_streetname.alias("streetname"), 
                        df.atm_street_number.alias("street_number"), df.atm_zipcode.alias("zipcode"), 
                        df.atm_lat.alias("lat"), df.atm_lon.alias("lon")])

In [26]:
atm_loc_df = atm_loc_df.dropDuplicates()

In [27]:
out_rdd1 = atm_loc_df.rdd.zipWithIndex().map(lambda (row,rowId): (list(row)+ [rowId+1]))

out_df1 = atm_loc_df.withColumn("location_id", lit(1))

dimlocation_df = sqlContext.createDataFrame(out_rdd1, schema=out_df1.schema).select(["location_id","location","streetname","street_number","zipcode", "lat", "lon"])


In [28]:
dimlocation_df.count()

109

In [29]:
dimlocation_df.show()

+-----------+--------------------+------------------+-------------+-------+------+------+
|location_id|            location|        streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+------------------+-------------+-------+------+------+
|          1|Aalborg Storcente...|          Hobrovej|          452|   9200|57.005| 9.876|
|          2|            Hasseris|       Hasserisvej|          113|   9000|57.044| 9.898|
|          3|         Bispensgade|       Bispensgade|           35|   9800|57.453| 9.996|
|          4|           HolbÃƒÂ¦k|       Slotsvolden|            7|   4300|55.718|11.704|
|          5|            Bindslev|       NÃƒÂ¸rrebro|           18|   9881|57.541|  10.2|
|          6|      Bryggen  Vejle|  SÃƒÂ¸nderbrogade|            2|   7100|55.705| 9.532|
|          7|          Abildgaard|    HjÃƒÂ¸rringvej|          144|   9900|57.447|10.506|
|          8|            Slagelse|   Mariendals Alle|           29|   4200|55.398|11.342|
|         

In [30]:
dimlocation_df.coalesce(1).write.format('csv').save('/root/data20')

In [31]:
#from pyspark.sql.functions import from_utc_timestamp
#from pyspark.sql.functions import to_timestamp
#from pyspark.sql import functions 
from pyspark.sql.functions import col, from_unixtime
from pyspark.sql.functions import unix_timestamp
import pandas
import time
import os


In [32]:
# Date Dimension 
atm_date_df = df.select([df.year,df.month,df.day,df.weekday,df.hour])




In [33]:
res_df = atm_date_df.distinct().withColumn("full_date", from_unixtime(unix_timestamp(concat(atm_date_df.year.cast(StringType()),
                                                                     atm_date_df.month.cast(StringType()), 
                                                                   lpad(df.day.cast(StringType()), 2, '0'),
                                                                   lpad(df.hour.cast(StringType()), 2, '0')),'yyyyMMMMMddHH'), 'YYYY-MM-dd HH:mm:SS'))




In [34]:
out_rdd = res_df.rdd.zipWithIndex().map(lambda (row,rowId): (list(row)+ [rowId+1]))

out_df = res_df.withColumn("date_id", lit(1))

dimdate_df = sqlContext.createDataFrame(out_rdd, schema=out_df.schema).select(["date_id","full_date","year","month","day", "hour", "weekday"])


In [35]:
dimdate_df.count()

8685

In [36]:
dimdate_df.show()

+-------+-------------------+----+--------+---+----+--------+
|date_id|          full_date|year|   month|day|hour| weekday|
+-------+-------------------+----+--------+---+----+--------+
|      1|2017-01-01 09:00:00|2017| January|  1|   9|  Sunday|
|      2|2017-01-03 05:00:00|2017| January|  3|   5| Tuesday|
|      3|2017-01-08 19:00:00|2017| January|  8|  19|  Sunday|
|      4|2017-01-21 03:00:00|2017| January| 21|   3|Saturday|
|      5|2017-01-23 21:00:00|2017| January| 23|  21|  Monday|
|      6|2017-02-02 19:00:00|2017|February|  2|  19|Thursday|
|      7|2017-02-05 16:00:00|2017|February|  5|  16|  Sunday|
|      8|2017-02-21 15:00:00|2017|February| 21|  15| Tuesday|
|      9|2017-03-02 08:00:00|2017|   March|  2|   8|Thursday|
|     10|2017-04-02 02:00:00|2017|   April|  2|   2|  Sunday|
|     11|2017-04-06 08:00:00|2017|   April|  6|   8|Thursday|
|     12|2017-04-30 10:00:00|2017|   April| 30|  10|  Sunday|
|     13|2017-05-02 02:00:00|2017|     May|  2|   2| Tuesday|
|     14

In [37]:
dimdate_df.coalesce(1).write.format('csv').save('/root/data30')

In [38]:
# Card Dimension 
atm_card_df = df.select([df.card_type.alias("card_type")])

In [39]:
atm_card_df = atm_card_df.dropDuplicates()

In [40]:
out_rdd2 = atm_card_df.rdd.zipWithIndex().map(lambda (row,rowId): (list(row)+ [rowId+1]))

out_df2 = atm_card_df.withColumn("card_type_id", lit(1))

dimcard_df = sqlContext.createDataFrame(out_rdd2, schema=out_df2.schema).select(["card_type_id","card_type"])

In [41]:
dimcard_df.count()

12

In [42]:
dimcard_df.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|     Dankort - on-us|
|           2|              CIRRUS|
|           3|         HÃƒÂ¦vekort|
|           4|                VISA|
|           5|  Mastercard - on-us|
|           6|             Maestro|
|           7|Visa Dankort - on-us|
|           8|        Visa Dankort|
|           9|            VisaPlus|
|          10|          MasterCard|
|          11|             Dankort|
|          12| HÃƒÂ¦vekort - on-us|
+------------+--------------------+



In [43]:
dimcard_df.coalesce(1).write.format('csv').save('/root/data40')

In [44]:
# ATM Dimension 
atm_atm_df = df.select([df.atm_id, df.atm_manufacturer, df.atm_lat, df.atm_lon])


In [45]:
atm_atm_df = atm_atm_df.dropDuplicates()

In [46]:
out_rdd3 = atm_atm_df.rdd.zipWithIndex().map(lambda (row,rowId): (list(row)+ [rowId+1]))

out_df3 = atm_atm_df.withColumn("atm_main_id", lit(1))

dimatm_df = sqlContext.createDataFrame(out_rdd3, schema=out_df3.schema).select(["atm_main_id","atm_id","atm_manufacturer",
                                                                               "atm_lat","atm_lon"])

In [47]:
dimatm_df.count()

113

In [48]:
new_atm_df = dimatm_df.join(dimlocation_df, (dimatm_df.atm_lat == dimlocation_df.lat) & 
                             (dimatm_df.atm_lon == dimlocation_df.lon))
new_atm_df.count()

156

In [49]:
new_atm_df.show()


+-----------+------+----------------+-------+-------+-----------+--------------------+----------------+-------------+-------+------+------+
|atm_main_id|atm_id|atm_manufacturer|atm_lat|atm_lon|location_id|            location|      streetname|street_number|zipcode|   lat|   lon|
+-----------+------+----------------+-------+-------+-----------+--------------------+----------------+-------------+-------+------+------+
|         38|    18| Diebold Nixdorf| 56.448|  9.401|         13|              Viborg|       Toldboden|            3|   8800|56.448| 9.401|
|         64|   101|             NCR| 55.705|  9.532|          6|      Bryggen  Vejle|SÃƒÂ¸nderbrogade|            2|   7100|55.705| 9.532|
|         20|     9| Diebold Nixdorf| 56.716| 10.114|         25|             Hadsund|       Storegade|           12|   9560|56.716|10.114|
|         31|    64|             NCR| 55.859|  9.854|         88|             Horsens| GrÃƒÂ¸nlandsvej|            5|   8700|55.859| 9.854|
|         18|    30|

In [50]:
new_atm_df.coalesce(1).write.format('csv').save('/root/data50')

In [51]:
# fact table


In [52]:
df=df.alias('df')
dimdate_df=dimdate_df.alias('dimdate_df')
new_atm_df=new_atm_df.alias('new_atm_df')
dimlocation_df=dimlocation_df.alias('dimlocation_df')
dimcard_df=dimcard_df.alias('dimcard_df')



In [53]:
# 1st join with Date
stg1_df=df.join(dimdate_df,
                on=['year','month','day','hour','weekday'], 
                how='left').select("df.*", "dimdate_df.date_id").drop(*['year','month','day','hour','weekday'])

In [54]:
stg1_df=stg1_df.alias('stg1_df')


In [55]:
# 2nd join with Card
stg2_df=stg1_df.join(dimcard_df,
                    on=['card_type'],
                    how='left').select("stg1_df.*","card_type_id").drop(*['card_type'])

In [56]:
stg2_df.show()

+----------+------+----------------+--------------------+-----------------+-----------------+-----------+-------+-------+--------+-----------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+
|atm_status|atm_id|atm_manufacturer|        atm_location|   atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|transction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_type_id|
+----------+------+----------------+--------------------+-----------------+-----------------+-----------+-------+-------+--------+-----------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+-

In [57]:
# 3rd join with Location
stg2_df=stg2_df.alias('stg2_df')

stg2_df=stg2_df.withColumnRenamed('atm_location','location').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode')




In [58]:
stg3_df=stg2_df.join(dimlocation_df,
                    on=['location','lat','lon','streetname','street_number','zipcode'],
                    how='left').select("stg2_df.*","location_id").drop(*['location','lat','lon','streetname','street_number','zipcode'])


In [59]:
stg3_df.show()

+----------+------+----------------+--------+-----------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+-------+------------+-----------+
|atm_status|atm_id|atm_manufacturer|currency|transction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|date_id|card_type_id|location_id|
+----------+------+----------------+--------+-----------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+-------+------------+-----------+
|    Active|     6|             NCR|     DKK|             5622|Withdrawal|        null|       

In [60]:
# 4th join with ATM
stg3_df=stg3_df.alias('stg3_df')

In [61]:
stg3_df=stg3_df.withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode')


In [62]:
stg4_df=stg3_df.join(new_atm_df,
                    on=['atm_manufacturer','location_id'],
                    how='left').select("stg3_df.*","atm_main_id").drop(*['atm_manufacturer','location_id'])


In [63]:
stg4_df.show()

+----------+------+--------+-----------------+----------+------------+--------------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+-----------+
|atm_status|atm_id|currency|transction_amount|   service|message_code|        message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_type_id|atm_main_id|
+----------+------+--------+-----------------+----------+------------+--------------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+-----------+
|    Active|    51|     DKK|             1999|Withdrawal|        null|                null|     56.994|      9.991|        2

In [64]:
stg_temp = stg4_df.dropDuplicates()


In [65]:
out_rdd_fact = stg_temp.rdd.zipWithIndex().map(lambda (row,rowId): (list(row)+ [rowId+1]))

out_df_fact = stg_temp.withColumn("trans_id", lit(1))

fact_df = sqlContext.createDataFrame(out_rdd_fact, schema=out_df_fact.schema).select(["trans_id","atm_status","atm_id","currency","transction_amount","service","message_code","message_text","weather_lat","weather_lon","weather_city_id","weather_city_name","temp","pressure","humidity","wind_speed","wind_deg","rain_3h","clouds_all","weather_id","weather_main","weather_description","date_id","card_type_id","atm_main_id"])


In [66]:
fact_df.count()

3095761

In [67]:
fact_df.coalesce(1).write.format('csv').save('/root/data60')

In [68]:
fact_df.show()

+--------+----------+------+--------+-----------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+-------+------------+-----------+
|trans_id|atm_status|atm_id|currency|transction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|date_id|card_type_id|atm_main_id|
+--------+----------+------+--------+-----------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+-------+------------+-----------+
|       1|    Active|    51|     DKK|             1537|Withdrawal|        null|        null|     56.994|      9.991|  

Traceback (most recent call last):
  File "/opt/cloudera/parcels/Anaconda-2019.10/lib/python2.7/SocketServer.py", line 293, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/cloudera/parcels/Anaconda-2019.10/lib/python2.7/SocketServer.py", line 321, in process_request
    self.finish_request(request, client_address)
  File "/opt/cloudera/parcels/Anaconda-2019.10/lib/python2.7/SocketServer.py", line 334, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/cloudera/parcels/Anaconda-2019.10/lib/python2.7/SocketServer.py", line 655, in __init__
    self.handle()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2//python/lib/pyspark.zip/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2//python/lib/pyspark.zip/pyspark/serializers.py", line 685, in read_int
    