In [None]:
 # Importing Postgres JDBC packages for data extraction
pyspark --packages org.postgresql:postgresql:42.2.10 --conf spark.sql.catalogImplementation=in-memory

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import rand
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.mllib.stat import Statistics
from pyspark.sql.functions import covar_pop
from pyspark.sql.functions import input_file_name
import pandas as pd
import glob
import os
import pandas as pd

spark = SparkSession.builder.master("local[1]") \
                    .appName('Sparkusecase') \
                    .getOrCreate()

# Temptables cleaning process

spark.catalog.dropTempView("cust_info_s")
spark.catalog.dropTempView("trans_info_s")
spark.catalog.dropTempView("offer_info_s")

# List of table to extract from source server

tablename_list = ['cust_info_s','trans_info_s','offer_info_s']

# Data extraction from Postgres server for dependent tables
    
url = "jdbc:postgresql://localhost:5432/postgres"
reader = (
    sqlContext.read.format("jdbc")
    .option("url", url)
    .option("user", "postgres")
    .option("password", "puvi")
    .option("driver", "org.postgresql.Driver")
)
for tablename in tablename_list:
    reader.option("dbtable", tablename).load().registerTempTable(tablename)


    
# Sample view of source tables extracted from Postgres server   

sqlc = SQLContext(sc)

sqlc.sql("select * from cust_info_s").show(10)
sqlc.sql("select * from trans_info_s").show(10)
sqlc.sql("select * from offer_info_s").show(10)

# Tranforming the data into business requirements

df_transf = sqlc.sql("""  WITH sales_data AS ( select sum(sales) as total_sales,count(trans_id) as visits,cust_id,date 
            from trans_info_s
            group by date,cust_id),
            offer_data AS ( 
            select count(o.offer_id) as no_offer_received,sum(offer_redem) as no_offer_redem,o.cust_id 
            from offer_info_s o
            inner join trans_info_s t
            on t.cust_id = o.cust_id
            group by o.cust_id order by o.cust_id)
            select b.date,b.cust_id,a.cust_name,a.cust_dob,
            int(datediff(current_date(),TO_DATE(CAST(UNIX_TIMESTAMP(a.cust_dob,'yyyy-MM-dd') AS TIMESTAMP)))/365) as age,
            total_sales,c.no_offer_received,c.no_offer_redem,visits, date_format(to_date(b.date),'E') as days,
            int(date_format(to_date(b.date),'w')) AS  week_number
            from cust_info_s a
            left join sales_data b
            on a.cust_id = b.cust_id
            inner join offer_data c
            on b.cust_id = c.cust_id
            group by b.date,a.cust_id,b.cust_id,a.cust_name,a.cust_dob,c.no_offer_received,c.no_offer_redem,total_sales,visits,days
            order by a.cust_id
            """)

final_dfs = df_transf.orderBy('cust_id').groupby('cust_id','cust_name','age','week_number','no_offer_received','no_offer_redem','total_sales').pivot('days').max('no_offer_received').fillna(0)

#Sampling data - In case of job failure data can be viewed in Spark UI

final_dfs.show()


# Writin the final data into CSV files


final_dfs.repartition(1).write.mode("overwrite").option("header",True).csv("file:///C:/hdaoopdata/ics_weekly_data/")


+-------------------+----------------+----------+--------+
|          cust_name|         cust_id|  cust_dob|  gender|
+-------------------+----------------+----------+--------+
|         RoseJacobi|6011837504027367|1981-08-06|   Edgar|
|Dr. Carley Predovic|4716744331063073|1984-07-30|   Deron|
|      Lilla Weimann|5370009829177288|1993-05-16|Leonardo|
|Cathrine Hodkiewicz| 345858730400343|1995-05-20|    Rick|
|      Celia Pfeffer| 340364615541355|2009-06-25|  Bryana|
|        Cleve Fahey|2628327277351883|2017-08-16| Ibrahim|
|     Gillian Senger| 346688355572616|1972-02-06| Keshawn|
|     Joshua Roberts|5290632464794443|1995-08-18|   Ethan|
|   Rasheed Gislason|2506922387299209|1994-07-06| Rasheed|
|      Felicity Ryan|4532251518044506|2001-09-08|   Aleen|
+-------------------+----------------+----------+--------+
only showing top 10 rows

+--------+----------------+----------+--------+---------+-----------+-----+----------+
|trans_id|         cust_id|product_id|store_id| offer_id|offe

In [None]:
Check for trans date and check the peroid whether its falls on valid range on offer table
Visit on days level - instead of no of received , 
Convert spark sql into dataframe
ETL framwork design

In [12]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import rand
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.mllib.stat import Statistics
from pyspark.sql.functions import covar_pop
from pyspark.sql.functions import input_file_name
import pandas as pd
import glob
import os
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import from_unixtime
from pyspark.sql.types import DateType
from pyspark.sql.functions import weekofyear

spark = SparkSession.builder.master("local[1]") \
                    .appName('Sparkusecase') \
                    .getOrCreate()

# Temptables cleaning process

spark.catalog.dropTempView("cust_info_s")
spark.catalog.dropTempView("trans_info_s")
spark.catalog.dropTempView("offer_info_s")

# List of table to extract from source server

tablename_list = ['cust_info_s','trans_info_s','offer_info_s']

# Data extraction from Postgres server for dependent tables
    
url = "jdbc:postgresql://localhost:5432/postgres"
reader = (
    sqlContext.read.format("jdbc")
    .option("url", url)
    .option("user", "postgres")
    .option("password", "puvi")
    .option("driver", "org.postgresql.Driver")
)
for tablename in tablename_list:
    reader.option("dbtable", tablename).load().registerTempTable(tablename)


    
# Sample view of source tables extracted from Postgres server   

sqlc = SQLContext(sc)

#sqlc.sql("select * from cust_info_s").show(10)
#sqlc.sql("select * from trans_info_s").show(10)
#sqlc.sql("select * from offer_info_s").show(10)

# Tranforming the data into business requirements

df_trans_info = sqlc.sql("select * from trans_info_s")
df_offer_info = sqlc.sql("select * from offer_info_s")
df_cust_info = sqlc.sql("select DATE(cust_dob),cust_id,cust_name from cust_info_s")

#df_cust_info.printSchema()

df_sales_data = df_trans_info.orderBy('cust_id').groupBy('date','cust_id').agg(sum('sales'),count('trans_id')) \
.withColumnRenamed('sum(sales)', 'total_sales').withColumnRenamed('count(trans_id)', 'visits')

#df_sales_data.show()

df_offer_data_f = df_offer_info.join(df_trans_info,df_offer_info.cust_id ==  df_trans_info.cust_id,'inner') \
.orderBy(df_offer_info.cust_id).groupBy(df_offer_info.cust_id).agg(sum('offer_redem'),count(df_offer_info.offer_id)) \
.withColumnRenamed('sum(offer_redem)', 'no_offer_redem').withColumnRenamed('count(offer_id)', 'no_offer_received') 

#df_offer_data_f.show(truncate=False)

#Check trans date is lies in peroid between offer start and end date - if yes offer is redemed and 1, else 0

df_off = df_trans_info.join(df_offer_info,df_trans_info.cust_id ==  df_offer_info.cust_id,'inner') \
.select(df_trans_info.cust_id,df_trans_info.date,df_offer_info.start_date,df_offer_info.end_date,df_offer_info.offer_id,df_trans_info.offer_redem,
F.when((df_trans_info.date >= df_offer_info.start_date) & (df_trans_info.date <= df_offer_info.end_date), 1) \
.otherwise(0).alias('offer_validity'))

#df_off.show()


df_offer_data_check = df_off.orderBy(df_off.cust_id,) \
.groupBy(df_off.cust_id,df_off.offer_validity).agg(sum('offer_redem'),count('offer_id')) \
.withColumnRenamed('sum(offer_redem)', 'no_offer_redem').withColumnRenamed('count(offer_id)', 'no_offer_received') 

#df_offer_data.show(truncate=False)

df_offer_data = df_offer_data_check.withColumnRenamed("cust_id", "off_cust_id")

#df_offer_data.show(truncate=False)

df_trans_data = df_cust_info.join(df_sales_data,df_cust_info.cust_id ==  df_sales_data.cust_id,'left') \
.join(df_offer_data,df_sales_data.cust_id ==  df_offer_data.off_cust_id,'inner') \
.select(df_sales_data.date,df_sales_data.cust_id,df_cust_info.cust_name,df_cust_info.cust_dob \
,df_sales_data.total_sales,df_offer_data.no_offer_received,df_offer_data.no_offer_redem,df_offer_data.offer_validity,df_sales_data.visits \
,round(months_between(current_date(),df_cust_info.cust_dob)/lit(12),2).cast('int').alias('age') \
,date_format(to_date(df_sales_data.date),'E').alias('days') \
,weekofyear(df_sales_data.date).alias('week_number')) 

#Derive which day the coustomer spends more

final_dfs = df_trans_data.orderBy('cust_id').groupby('cust_id','cust_name','age','week_number','no_offer_received','no_offer_redem','offer_validity','total_sales','visits').pivot('days').max('visits').fillna(0)


#Sampling data - In case of job failure data can be viewed in Spark UI

final_dfs.show()

# Writin the final data into CSV files

final_dfs.repartition(1).write.mode("overwrite").option("header",True).csv("file:///C:/hdaoopdata/ics_weekly_data/")


+----------------+-------------------+---+-----------+-----------------+--------------+--------------+-----------+------+---+---+---+---+
|         cust_id|          cust_name|age|week_number|no_offer_received|no_offer_redem|offer_validity|total_sales|visits|Sat|Sun|Thu|Wed|
+----------------+-------------------+---+-----------+-----------------+--------------+--------------+-----------+------+---+---+---+---+
|4716095485621384|Tatum Rosenbaum Jr.| 29|          6|                6|             4|             1|         13|     3|  0|  3|  0|  0|
|4716744331063073|Dr. Carley Predovic| 37|         38|                2|             2|             0|          8|     1|  0|  0|  1|  0|
|4929800060892364|   Zula Ziemann PhD| 29|         36|                2|             0|             1|         10|     1|  1|  0|  0|  0|
|5370009829177288|      Lilla Weimann| 28|         34|                2|             2|             1|          5|     1|  0|  0|  0|  1|
+----------------+----------------