In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import * 
from pyspark.sql.functions import regexp_replace,split,substring
import findspark
import configparser

In [4]:
spark = SparkSession.builder \
.appName("JDBC and SQL") \
.master("yarn") \
.getOrCreate()

# Read Data

In [35]:
df = spark.read \
.format("csv") \
.option("header", True) \
.option("sep", ",") \
.option("inferSchema", True) \
.load("file:///home/train/datasets/dirty_store_transactions.csv")

                                                                                

In [36]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import  *

In [37]:
config = configparser.RawConfigParser()

config.read('./db_conn')
user_name = config.get('DB', 'user_name')
password = config.get('DB', 'password')
db_ip = config.get('DB', 'db_ip')

In [38]:
programmatical_schema = StructType([
        StructField("STORE_ID",StringType(),True),
        StructField("STORE_LOCATION",StringType(),True),
        StructField("PRODUCT_CATEGORY",StringType(),True),
        StructField("PRODUCT_ID",StringType(),True),
        StructField("MRP",StringType(),True),
        StructField("CP",StringType(),True),
        StructField("DISCOUNT",StringType(),True),
        StructField("SP",StringType(),True),
        StructField("Date",DateType(),True)
    ])

In [39]:
df2 = spark.read \
.format("csv") \
.option("header", True) \
.option("sep", ",") \
.schema(programmatical_schema) \
.load("file:///home/train/datasets/dirty_store_transactions.csv")

In [40]:
symbol_pattern = r"[^\w\s]"

In [41]:
df3 = df2.withColumn("STORE_LOCATION", regexp_replace("STORE_LOCATION", symbol_pattern, ""))

In [42]:
df4 = df3.withColumn("PRODUCT_ID", regexp_replace("PRODUCT_ID", symbol_pattern, ""))


In [43]:
df4.show(n=15, truncate=False)


+--------+--------------+----------------+----------+---+------+--------+------+----------+
|STORE_ID|STORE_LOCATION|PRODUCT_CATEGORY|PRODUCT_ID|MRP|CP    |DISCOUNT|SP    |Date      |
+--------+--------------+----------------+----------+---+------+--------+------+----------+
|YR7220  |New York      |Electronics     |12254943  |$31|$20.77|$1.86   |$29.14|2019-11-26|
|YR7220  |New York      |Furniture       |72619323C |$15|$9.75 |$1.5    |$13.5 |2019-11-26|
|YR7220  |New York      |Electronics     |34161682B |$88|$62.48|$4.4    |$83.6 |2019-11-26|
|YR7220  |New York      |Kitchen         |79411621  |$91|$58.24|$3.64   |$87.36|2019-11-26|
|YR7220  |New York      |Fashion         |39520263T |$85|$51   |$2.55   |$82.45|2019-11-26|
|YR7220  |New York      |Kitchen         |93809204  |$37|$24.05|$0.74   |$36.26|2019-11-26|
|YR7220  |New York      |Cosmetics       |86610412D |$80|$48.8 |$6.4    |$73.6 |2019-11-26|
|YR7220  |New York      |Kitchen         |52503356  |$71|$42.6 |$5.68   |$65.32|

In [44]:
df4.limit(5).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,New York,Electronics,12254943,$31,$20.77,$1.86,$29.14,2019-11-26
1,YR7220,New York,Furniture,72619323C,$15,$9.75,$1.5,$13.5,2019-11-26
2,YR7220,New York,Electronics,34161682B,$88,$62.48,$4.4,$83.6,2019-11-26
3,YR7220,New York,Kitchen,79411621,$91,$58.24,$3.64,$87.36,2019-11-26
4,YR7220,New York,Fashion,39520263T,$85,$51,$2.55,$82.45,2019-11-26


In [45]:
from pyspark.sql.functions import split, col


df4 = df4.withColumn('Money Unity', substring('CP', 1, 1))

In [46]:

df4 = df4.withColumn("CP", split(col("CP"), "\$").getItem(1))
df4 = df4.withColumn("DISCOUNT", split(col("DISCOUNT"), "\$").getItem(1))
df4 = df4.withColumn("SP", split(col("SP"), "\$").getItem(1))
df4 = df4.withColumn("MRP", split(col("MRP"), "\$").getItem(1))

In [47]:
df4.show()

+--------+--------------+----------------+----------+---+-----+--------+-----+----------+-----------+
|STORE_ID|STORE_LOCATION|PRODUCT_CATEGORY|PRODUCT_ID|MRP|   CP|DISCOUNT|   SP|      Date|Money Unity|
+--------+--------------+----------------+----------+---+-----+--------+-----+----------+-----------+
|  YR7220|      New York|     Electronics|  12254943| 31|20.77|    1.86|29.14|2019-11-26|          $|
|  YR7220|      New York|       Furniture| 72619323C| 15| 9.75|     1.5| 13.5|2019-11-26|          $|
|  YR7220|     New York |     Electronics| 34161682B| 88|62.48|     4.4| 83.6|2019-11-26|          $|
|  YR7220|      New York|         Kitchen|  79411621| 91|58.24|    3.64|87.36|2019-11-26|          $|
|  YR7220|      New York|         Fashion| 39520263T| 85|   51|    2.55|82.45|2019-11-26|          $|
|  YR7220|      New York|         Kitchen|  93809204| 37|24.05|    0.74|36.26|2019-11-26|          $|
|  YR7220|      New York|       Cosmetics| 86610412D| 80| 48.8|     6.4| 73.6|2019

In [None]:
(df4.write
.format("jdbc")
.mode("overwrite")
.option("driver", "org.postgresql.Driver")
.option("url", f"jdbc:postgresql://{db_ip}:5432/traindb")
.option("dbtable", "clean_transactions")
.option("user", user_name)
.option("password", password)
.save())

In [53]:
df5 = (spark.read.format("jdbc") 
.option("driver","org.postgresql.Driver")
.option("url",f"jdbc:postgresql://{db_ip}:5432/traindb")
.option("dbtable","clean_transactions")
.option("user",user_name)
.option("password",password)
.load())

In [54]:
df5.show()

+--------+--------------+----------------+----------+---+-----+--------+-----+----------+-----------+
|STORE_ID|STORE_LOCATION|PRODUCT_CATEGORY|PRODUCT_ID|MRP|   CP|DISCOUNT|   SP|      Date|Money Unity|
+--------+--------------+----------------+----------+---+-----+--------+-----+----------+-----------+
|  YR7220|      New York|     Electronics|  12254943| 31|20.77|    1.86|29.14|2019-11-26|          $|
|  YR7220|      New York|       Furniture| 72619323C| 15| 9.75|     1.5| 13.5|2019-11-26|          $|
|  YR7220|     New York |     Electronics| 34161682B| 88|62.48|     4.4| 83.6|2019-11-26|          $|
|  YR7220|      New York|         Kitchen|  79411621| 91|58.24|    3.64|87.36|2019-11-26|          $|
|  YR7220|      New York|         Fashion| 39520263T| 85|   51|    2.55|82.45|2019-11-26|          $|
|  YR7220|      New York|         Kitchen|  93809204| 37|24.05|    0.74|36.26|2019-11-26|          $|
|  YR7220|      New York|       Cosmetics| 86610412D| 80| 48.8|     6.4| 73.6|2019