In [0]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('assignment_01').getOrCreate()
spark

In [0]:
Product_data =[('Washing Machine',1648770933000, 20000,'Samsung', 'India','0001'),
               ('Refrigerator',1648770999000,35000,' LG','null','0002'),
               ('Air Cooler',1648770948000,45000,' Voltas','null','0003')]

user_schema = ["Product_Name","Issue_Date","Price","Brand","Country","Product_number"]
ProductDF= spark.createDataFrame(data=Product_data,schema=user_schema)

ProductDF.printSchema()
display(ProductDF)

root
 |-- Product_Name: string (nullable = true)
 |-- Issue_Date: long (nullable = true)
 |-- Price: long (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Product_number: string (nullable = true)



Product_Name,Issue_Date,Price,Brand,Country,Product_number
Washing Machine,1648770933000,20000,Samsung,India,1
Refrigerator,1648770999000,35000,LG,,2
Air Cooler,1648770948000,45000,Voltas,,3


In [0]:
from pyspark.sql.functions import *
df=ProductDF.withColumn('Issue_Date_timestamp', from_unixtime(substring(col('Issue_Date'), 1, 10),"yyyy-MM-dd'T'HH:mm:ss[.SSS][ZZZ]"))
df.printSchema()
display(df)

root
 |-- Product_Name: string (nullable = true)
 |-- Issue_Date: long (nullable = true)
 |-- Price: long (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Product_number: string (nullable = true)
 |-- Issue_Date_timestamp: string (nullable = true)



Product_Name,Issue_Date,Price,Brand,Country,Product_number,Issue_Date_timestamp
Washing Machine,1648770933000,20000,Samsung,India,1,2022-03-31T23:55:33.000+0000
Refrigerator,1648770999000,35000,LG,,2,2022-03-31T23:56:39.000+0000
Air Cooler,1648770948000,45000,Voltas,,3,2022-03-31T23:55:48.000+0000


In [0]:
from pyspark.sql.functions import *

df2=ProductDF.withColumn('Issue_Date_date_type', from_unixtime(substring(col('Issue_Date'), 1, 10),"yyyy-MM-dd"))
display(df2)


Product_Name,Issue_Date,Price,Brand,Country,Product_number,Issue_Date_date_type
Washing Machine,1648770933000,20000,Samsung,India,1,2022-03-31
Refrigerator,1648770999000,35000,LG,,2,2022-03-31
Air Cooler,1648770948000,45000,Voltas,,3,2022-03-31


In [0]:
# Remove leading space of the column in pyspark
from pyspark.sql.functions import *
 
df_product = ProductDF.withColumn('Brand', ltrim(ProductDF.Brand))
df_product.show(truncate =False)

+---------------+----------+-----+-------+-------+--------------+
|Product_Name   |Issue_Date|Price|Brand  |Country|Product_number|
+---------------+----------+-----+-------+-------+--------------+
|Washing Machine|1648770933|20000|Samsung|India  |0001          |
|Refrigerator   |1648770999|35000|LG     |null   |0002          |
|Air Cooler     |1648770948|45000|Voltas |null   |0003          |
+---------------+----------+-----+-------+-------+--------------+



In [0]:
from pyspark.sql.functions import when, regexp_replace

Remove_Null_Values = df_product.withColumn('Country',regexp_replace('Country', 'null', 'Empty_Values'))\
  .show(truncate=False)

+---------------+----------+-----+-------+------------+--------------+
|Product_Name   |Issue_Date|Price|Brand  |Country     |Product_number|
+---------------+----------+-----+-------+------------+--------------+
|Washing Machine|1648770933|20000|Samsung|India       |0001          |
|Refrigerator   |1648770999|35000|LG     |Empty_Values|0002          |
|Air Cooler     |1648770948|45000|Voltas |Empty_Values|0003          |
+---------------+----------+-----+-------+------------+--------------+



In [0]:
Products_Details =[(150711, 123456, 'EN', 456789, '2021-12-27T08:20:29.842+0000', '0001'),
                   (150439, 234567, 'UK', 345678, '2021-12-27T08:21:14.645+0000', '0002'),
                   (150647, 345678, 'ES', 234567, '2021-12-27T08:22:42.445+0000','0003')]

user_schema = ["SourceId","TransactionNumber","Language","ModelNumber","StartTime","ProductNumber" ]
Products_Details_DF= spark.createDataFrame(data=Products_Details,schema=user_schema)

Products_Details_DF.printSchema()
display(Products_Details_DF)

root
 |-- SourceId: long (nullable = true)
 |-- TransactionNumber: long (nullable = true)
 |-- Language: string (nullable = true)
 |-- ModelNumber: long (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- ProductNumber: string (nullable = true)



SourceId,TransactionNumber,Language,ModelNumber,StartTime,ProductNumber
150711,123456,EN,456789,2021-12-27T08:20:29.842+0000,1
150439,234567,UK,345678,2021-12-27T08:21:14.645+0000,2
150647,345678,ES,234567,2021-12-27T08:22:42.445+0000,3


In [0]:
change_df = Products_Details_DF.withColumnRenamed('SourceId', 'Source_id')\
.withColumnRenamed('TransactionNumber', 'Transaction_number')\
.withColumnRenamed('ModelNumber', 'Model_number')\
.withColumnRenamed('StartTime', 'Start_time')\
.withColumnRenamed('ProductNumber', 'Product_number')
display(change_df)

Source_id,Transaction_number,Language,Model_number,Start_time,Product_number
150711,123456,EN,456789,2021-12-27T08:20:29.842+0000,1
150439,234567,UK,345678,2021-12-27T08:21:14.645+0000,2
150647,345678,ES,234567,2021-12-27T08:22:42.445+0000,3


In [0]:
# sourceDF1= sourceDF1.withColumn('start_time_ms', concat(unix_timestamp(to_date(date_format('start_time',"yyyy-MM-dd HH:mm:ss.SSS"))), substring('start_time',21,3)))

from pyspark.sql.functions import *
Convertdf=change_df.withColumn('start_time_ms', concat(unix_timestamp(to_date(date_format('Start_time',"yyyy-MM-dd HH:mm:ss.SSS"))),substring('Start_time',21,3)))
Convertdf.printSchema()
display(Convertdf)

root
 |-- Source_id: long (nullable = true)
 |-- Transaction_number: long (nullable = true)
 |-- Language: string (nullable = true)
 |-- Model_number: long (nullable = true)
 |-- Start_time: string (nullable = true)
 |-- Product_number: string (nullable = true)
 |-- start_time_ms: string (nullable = true)



Source_id,Transaction_number,Language,Model_number,Start_time,Product_number,start_time_ms
150711,123456,EN,456789,2021-12-27T08:20:29.842+0000,1,1640563200842
150439,234567,UK,345678,2021-12-27T08:21:14.645+0000,2,1640563200645
150647,345678,ES,234567,2021-12-27T08:22:42.445+0000,3,1640563200445


In [0]:
# 1. get all the fields in return.

df_join= ProductDF.join(change_df,ProductDF.Product_number == change_df.Product_number, "Full")
display(df_join)

Product_Name,Issue_Date,Price,Brand,Country,Product_number,Source_id,Transaction_number,Language,Model_number,Start_time,Product_number.1
Washing Machine,1648770933000,20000,Samsung,India,1,150711,123456,EN,456789,2021-12-27T08:20:29.842+0000,1
Refrigerator,1648770999000,35000,LG,,2,150439,234567,UK,345678,2021-12-27T08:21:14.645+0000,2
Air Cooler,1648770948000,45000,Voltas,,3,150647,345678,ES,234567,2021-12-27T08:22:42.445+0000,3


In [0]:
# 2. get the country as EN
df_join.filter(df_join.Language == "EN").show(truncate=False)

df_join.select('Country').filter(df_join.Language == "EN").show(truncate=False)

+---------------+----------+-----+-------+-------+--------------+---------+------------------+--------+------------+----------------------------+--------------+
|Product_Name   |Issue_Date|Price|Brand  |Country|Product_number|Source_id|Transaction_number|Language|Model_number|Start_time                  |Product_number|
+---------------+----------+-----+-------+-------+--------------+---------+------------------+--------+------------+----------------------------+--------------+
|Washing Machine|1648770933|20000|Samsung|India  |0001          |150711   |123456            |EN      |456789      |2021-12-27T08:20:29.842+0000|0001          |
+---------------+----------+-----+-------+-------+--------------+---------+------------------+--------+------------+----------------------------+--------------+

+-------+
|Country|
+-------+
|India  |
+-------+

