In [1]:
#goal is to  1-clean and convert unstructured data to structured
#            2-write df to hive with orc format
#            3-write data do postgresql table
#            4-write data to hdfs as parquet format and compress with snappy

#download the dataset
!wget https://raw.githubusercontent.com/erkansirin78/datasets/master/dirty_store_transactions.csv


--2024-03-18 21:52:27--  https://raw.githubusercontent.com/erkansirin78/datasets/master/dirty_store_transactions.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2609524 (2.5M) [text/plain]
Saving to: ‘dirty_store_transactions.csv’


2024-03-18 21:52:27 (168 MB/s) - ‘dirty_store_transactions.csv’ saved [2609524/2609524]



In [None]:
#imports and creating the spark session
import findspark
findspark.init("/opt/manual/spark")
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("case_study_cleaning_data") \
.master("yarn") \
.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
.enableHiveSupport() \
.getOrCreate()

In [None]:
#read csv to df
df = spark.read.format("csv") \
.option("header",True) \
.option("inferSchema",True) \
.option("sep",",") \
.load("file:///home/*youruser/my_pyspark/dirty_store_transactions.csv")

In [None]:
#check the df, need to clean the unwanted characters on store location and dollar signs on the other columns. Turn date column data format into date format.
df.limit(5).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,New York(,Electronics,12254943,$31,$20.77,$1.86,$29.14,2019-11-26
1,YR7220,New York+,Furniture,72619323C,$15,$9.75,$1.5,$13.5,2019-11-26
2,YR7220,New York,Electronics,34161682B,$88,$62.48,$4.4,$83.6,2019-11-26
3,YR7220,New York!,Kitchen,79411621,$91,$58.24,$3.64,$87.36,2019-11-26
4,YR7220,New York,Fashion,39520263T,$85,$51,$2.55,$82.45,2019-11-26


In [None]:
#clean and set the data types
df1 = df.withColumn("STORE_LOCATION", F.regexp_replace("STORE_LOCATION", "[^0-9a-zA-Z_\-]+", ""))
df1.limit(5).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,NewYork,Electronics,12254943,$31,$20.77,$1.86,$29.14,2019-11-26
1,YR7220,NewYork,Furniture,72619323C,$15,$9.75,$1.5,$13.5,2019-11-26
2,YR7220,NewYork,Electronics,34161682B,$88,$62.48,$4.4,$83.6,2019-11-26
3,YR7220,NewYork,Kitchen,79411621,$91,$58.24,$3.64,$87.36,2019-11-26
4,YR7220,NewYork,Fashion,39520263T,$85,$51,$2.55,$82.45,2019-11-26


In [None]:
#clean the dollar signs and set the date format
df2 = df1.withColumn("MRP",F.regexp_replace(F.col("MRP"),"\$","").cast(FloatType())) \
.withColumn("CP",F.regexp_replace(F.col("CP"),"\$","").cast(FloatType())) \
.withColumn("DISCOUNT",F.regexp_replace(F.col("DISCOUNT"),"\$","").cast(FloatType())) \
.withColumn("SP",F.regexp_replace(F.col("SP"),"\$","").cast(FloatType())) \
.withColumn("Date_Casted",F.to_date(F.col("Date"),"yyyy-MM-dd")).drop("Date")

df2.printSchema()


root
 |-- STORE_ID: string (nullable = true)
 |-- STORE_LOCATION: string (nullable = true)
 |-- PRODUCT_CATEGORY: string (nullable = true)
 |-- PRODUCT_ID: string (nullable = true)
 |-- MRP: float (nullable = true)
 |-- CP: float (nullable = true)
 |-- DISCOUNT: float (nullable = true)
 |-- SP: float (nullable = true)
 |-- Date_Casted: date (nullable = true)



In [None]:
#limit ile oku kontrol et
df2.limit(10).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date_Casted
0,YR7220,NewYork,Electronics,12254943,31.0,20.77,1.86,29.139999,2019-11-26
1,YR7220,NewYork,Furniture,72619323C,15.0,9.75,1.5,13.5,2019-11-26
2,YR7220,NewYork,Electronics,34161682B,88.0,62.48,4.4,83.599998,2019-11-26
3,YR7220,NewYork,Kitchen,79411621,91.0,58.240002,3.64,87.360001,2019-11-26
4,YR7220,NewYork,Fashion,39520263T,85.0,51.0,2.55,82.449997,2019-11-26
5,YR7220,NewYork,Kitchen,93809204,37.0,24.049999,0.74,36.259998,2019-11-26
6,YR7220,NewYork,Cosmetics,86610412D,80.0,48.799999,6.4,73.599998,2019-11-26
7,YR7220,NewYork,Kitchen,52503356^,71.0,42.599998,5.68,65.32,2019-11-26
8,YR7220,NewYork,Kitchen,77516479,92.0,56.119999,3.68,88.32,2019-11-26
9,YR7220,NewYork,Cosmetics,47334289,16.0,10.72,0.96,15.04,2019-11-26


In [None]:
#transfer data into hive, check the databases and existing tables first.
spark.sql("show databases;").show()

+---------+
|namespace|
+---------+
|bookstore|
|  default|
|movielens|
|    test1|
|    train|
+---------+



In [None]:
spark.sql("show tables;").show()

In [None]:
#will be working under test1 database
spark.sql("use test1;")

DataFrame[]

In [None]:
#
df2.write \
.format("orc") \
.mode("overwrite") \
.saveAsTable("test1.clean_transactions")

In [None]:
#check the table with SELECT
spark.sql("select * from test1.clean_transactions").limit(10).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date_Casted
0,YR7220,NewYork,Electronics,12254943,31.0,20.77,1.86,29.139999,2019-11-26
1,YR7220,NewYork,Furniture,72619323C,15.0,9.75,1.5,13.5,2019-11-26
2,YR7220,NewYork,Electronics,34161682B,88.0,62.48,4.4,83.599998,2019-11-26
3,YR7220,NewYork,Kitchen,79411621,91.0,58.240002,3.64,87.360001,2019-11-26
4,YR7220,NewYork,Fashion,39520263T,85.0,51.0,2.55,82.449997,2019-11-26
5,YR7220,NewYork,Kitchen,93809204,37.0,24.049999,0.74,36.259998,2019-11-26
6,YR7220,NewYork,Cosmetics,86610412D,80.0,48.799999,6.4,73.599998,2019-11-26
7,YR7220,NewYork,Kitchen,52503356^,71.0,42.599998,5.68,65.32,2019-11-26
8,YR7220,NewYork,Kitchen,77516479,92.0,56.119999,3.68,88.32,2019-11-26
9,YR7220,NewYork,Cosmetics,47334289,16.0,10.72,0.96,15.04,2019-11-26


In [None]:
#postgresql
jdbc_url= "jdbc:postgresql://localhost/userdb?user=*youruser&password=*****"

In [None]:

df2.write.jdbc(url = jdbc_url, table="clean_transactitons_table", mode = "overwrite", properties={"driver":"org.postgresql.Driver"})

In [None]:
#check the table
spark.read.format("jdbc").option("url",jdbc_url).option("query","Select * from clean_transactitons_table").option("driver","org.postgresql.Driver") \
.load().limit(5).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date_Casted
0,YR7220,NewYork,Electronics,12254943,31.0,20.77,1.86,29.139999,2019-11-26
1,YR7220,NewYork,Furniture,72619323C,15.0,9.75,1.5,13.5,2019-11-26
2,YR7220,NewYork,Electronics,34161682B,88.0,62.48,4.4,83.599998,2019-11-26
3,YR7220,NewYork,Kitchen,79411621,91.0,58.240002,3.64,87.360001,2019-11-26
4,YR7220,NewYork,Fashion,39520263T,85.0,51.0,2.55,82.449997,2019-11-26


In [None]:
#write to hdfs
df2.write.format("parquet").option("compression","snappy").save("hdfs://localhost:9000/user/train/spark_odev_transaction")

In [None]:
#check hdfs
!hdfs dfs -ls /user/*youruser/spark_odev_transaction

Found 2 items
-rw-r--r--   1 train supergroup          0 2024-03-19 00:22 /user/train/spark_odev_transaction/_SUCCESS
-rw-r--r--   1 train supergroup     244857 2024-03-19 00:22 /user/train/spark_odev_transaction/part-00000-3aad7b2c-8b13-4851-89f1-20ceb28dba2d-c000.snappy.parquet
