In [1]:
from pyspark.sql import SparkSession

In [2]:
Spark=SparkSession.builder.master("local[1]").appName("Dataframe-creation-transformations").getOrCreate()

In [3]:
Spark

In [5]:
#To create dataframe from a local list or local file , we use createDataFrame

In [6]:
order_list=[(1,'2013-07-25',11599,'CLOSED'),(2,'2013-07-25',256,'PENDING'),(3,'2013-07-25',12111,'COMPLETE')]

In [7]:
orders_df=Spark.createDataFrame(order_list)

In [8]:
orders_df.show()

+---+----------+-----+--------+
| _1|        _2|   _3|      _4|
+---+----------+-----+--------+
|  1|2013-07-25|11599|  CLOSED|
|  2|2013-07-25|  256| PENDING|
|  3|2013-07-25|12111|COMPLETE|
+---+----------+-----+--------+



In [9]:
orders_df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: string (nullable = true)



In [15]:
#inorder to add header we can use toDF() , which is one method

In [12]:
orders_df1=orders_df.toDF('order_id','order_date','customer_id','order_status')

In [13]:
orders_df1.show()

+--------+----------+-----------+------------+
|order_id|order_date|customer_id|order_status|
+--------+----------+-----------+------------+
|       1|2013-07-25|      11599|      CLOSED|
|       2|2013-07-25|        256|     PENDING|
|       3|2013-07-25|      12111|    COMPLETE|
+--------+----------+-----------+------------+



In [14]:
orders_df1.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [16]:
# another method is that

In [20]:
order_schema='order_id long,order_date string,customer_id long,order_status string'

In [21]:
orders_df_schema=Spark.createDataFrame(order_list,order_schema)

In [22]:
orders_df_schema.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [4]:
from pyspark.sql.functions import *

In [25]:
orders_df_schema_new=orders_df_schema.withColumn("order_date",to_timestamp("order_date"))

In [26]:
orders_df_schema_new.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [35]:
# how to convert rdd to dataframe

In [38]:
rdd=Spark.sparkContext.textFile(r"C:\Users\user\OneDrive\Desktop\Big-data-trendytech\SAMPLE_ORDERS.csv")

In [39]:
rdd.collect()

['1,2013-07-25,11599,CLOSED',
 '2,2013-07-26,256,COMPLETE',
 '3,2013-07-27,12111,CLOSED',
 '4,2013-08-28,8827,PENDING',
 '5,2013-07-29,11318,COMPLETE']

In [43]:
rdd_map=rdd.map(lambda x:(int(x.split(',')[0]),x.split(',')[1],int(x.split(',')[2]),x.split(',')[3]))

In [44]:
rdd_map.collect()

[(1, '2013-07-25', 11599, 'CLOSED'),
 (2, '2013-07-26', 256, 'COMPLETE'),
 (3, '2013-07-27', 12111, 'CLOSED'),
 (4, '2013-08-28', 8827, 'PENDING'),
 (5, '2013-07-29', 11318, 'COMPLETE')]

In [45]:
#two ways we can convert rdd to dataframe
#1.using createDataframe
#2.using toDF()

In [46]:
df_m1=Spark.createDataFrame(rdd_map,order_schema)

In [47]:
df_m1.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [48]:
df_m2=rdd_map.toDF(order_schema)

In [49]:
df_m2.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [37]:
rdd=Spark.sparkContext.textFile(r"C:\Users\user\OneDrive\Desktop\Big-data-trendytech\udemy\RetailDB+SalesData\RetailDB SalesData\Order_items\part-00000.txt")

In [38]:
rdd.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [18]:
rdd_map=rdd.map(lambda x:(int(x.split(',')[0]),int(x.split(',')[1]),int(x.split(',')[2]),int(x.split(',')[3]),float(x.split(',')[4]),float(x.split(',')[5])))

In [20]:
rdd_map.take(5)

[(1, 1, 957, 1, 299.98, 299.98),
 (2, 2, 1073, 1, 199.99, 199.99),
 (3, 2, 502, 5, 250.0, 50.0),
 (4, 2, 403, 1, 129.99, 129.99),
 (5, 4, 897, 2, 49.98, 24.99)]

In [21]:
product_schema="order_item_id long,order_id long,product_id long,quantity int,subtotal float,product_price float"

In [22]:
df_pdt=Spark.createDataFrame(rdd_map,product_schema)

In [23]:
df_pdt.printSchema()

root
 |-- order_item_id: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- subtotal: float (nullable = true)
 |-- product_price: float (nullable = true)



In [24]:
df_pdt.show(3)

+-------------+--------+----------+--------+--------+-------------+
|order_item_id|order_id|product_id|quantity|subtotal|product_price|
+-------------+--------+----------+--------+--------+-------------+
|            1|       1|       957|       1|  299.98|       299.98|
|            2|       2|      1073|       1|  199.99|       199.99|
|            3|       2|       502|       5|   250.0|         50.0|
+-------------+--------+----------+--------+--------+-------------+
only showing top 3 rows



In [25]:
#drop column subtotal

In [26]:
drop_df_pdt=df_pdt.drop("subtotal")

In [27]:
drop_df_pdt.printSchema()

root
 |-- order_item_id: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- product_price: float (nullable = true)



In [34]:
drop_df_pdt_new=drop_df_pdt.select("*",expr("product_price*quantity as subtotal1")).show(2)

+-------------+--------+----------+--------+-------------+---------+
|order_item_id|order_id|product_id|quantity|product_price|subtotal1|
+-------------+--------+----------+--------+-------------+---------+
|            1|       1|       957|       1|       299.98|   299.98|
|            2|       2|      1073|       1|       199.99|   199.99|
+-------------+--------+----------+--------+-------------+---------+
only showing top 2 rows



In [30]:
#if we have more than one calculation , we can use selectExpr instead of  select and then expr infront of the calculation

In [33]:
product=drop_df_pdt.selectExpr("*","product_price*quantity as subtotal1").show(2)

+-------------+--------+----------+--------+-------------+---------+
|order_item_id|order_id|product_id|quantity|product_price|subtotal1|
+-------------+--------+----------+--------+-------------+---------+
|            1|       1|       957|       1|       299.98|   299.98|
|            2|       2|      1073|       1|       199.99|   199.99|
+-------------+--------+----------+--------+-------------+---------+
only showing top 2 rows



In [39]:
pdt_wth_name=Spark.sparkContext.textFile(r"C:\Users\user\OneDrive\Desktop\Big-data-trendytech\udemy\RetailDB+SalesData\RetailDB SalesData\Order_items\part-00000 - name.txt")

In [40]:
pdt_wth_name_map=pdt_wth_name.map(lambda x:(int(x.split(',')[0]),int(x.split(',')[1]),int(x.split(',')[2]),int(x.split(',')[3]),float(x.split(',')[4]),float(x.split(',')[5]),x.split(',')[6]))

In [42]:
product_schema_name="order_item_id long,order_id long,product_id long,quantity int,subtotal float,product_price float,pdt_name string"

In [43]:
df_pdt_wth_name_map=Spark.createDataFrame(pdt_wth_name_map,product_schema_name)

In [44]:
df_pdt_wth_name_map.show(2)

+-------------+--------+----------+--------+--------+-------------+----------------+
|order_item_id|order_id|product_id|quantity|subtotal|product_price|        pdt_name|
+-------------+--------+----------+--------+--------+-------------+----------------+
|            1|       1|       957|       1|  299.98|       299.98|        Nike dfs|
|            2|       2|      1073|       1|  199.99|       199.99|fdg Armour ghdhd|
+-------------+--------+----------+--------+--------+-------------+----------------+
only showing top 2 rows



In [45]:
#need to increase the price of each item 20%

In [46]:
new_df2=df_pdt_wth_name_map.withColumn("product_price",expr("product_price*1.2"))

In [48]:
new_df2.show(2)

+-------------+--------+----------+--------+--------+------------------+----------------+
|order_item_id|order_id|product_id|quantity|subtotal|     product_price|        pdt_name|
+-------------+--------+----------+--------+--------+------------------+----------------+
|            1|       1|       957|       1|  299.98|359.97601318359375|        Nike dfs|
|            2|       2|      1073|       1|  199.99|239.98800659179688|fdg Armour ghdhd|
+-------------+--------+----------+--------+--------+------------------+----------------+
only showing top 2 rows



In [49]:
#I need to increase 20 percent for Nike products,10% for Armour products and no change for other products

In [50]:
new_df3=df_pdt_wth_name_map.withColumn("product_price",expr("CASE WHEN pdt_name like '%Nike%' THEN product_price*1.2 WHEN pdt_name like '%ARMOUR%' THEN product_price*1.1 ELSE product_price END"))

In [51]:
new_df3.show(5)

+-------------+--------+----------+--------+--------+------------------+----------------+
|order_item_id|order_id|product_id|quantity|subtotal|     product_price|        pdt_name|
+-------------+--------+----------+--------+--------+------------------+----------------+
|            1|       1|       957|       1|  299.98|359.97601318359375|        Nike dfs|
|            2|       2|      1073|       1|  199.99|199.99000549316406|fdg Armour ghdhd|
|            3|       2|       502|       5|   250.0|              50.0|fdg Armour ghdhd|
|            4|       2|       403|       1|  129.99|129.99000549316406|fdg Armour ghdhd|
|            5|       4|       897|       2|   49.98|29.987999725341794|        Nike dfs|
+-------------+--------+----------+--------+--------+------------------+----------------+
only showing top 5 rows



In [52]:
#How to remove duplicate records from dataframe

In [53]:
mylist=[(1,"kapil",34),(1,"kapil",34),(1,"satish",26),(2,"satish",26)]

In [54]:
rough_df=Spark.createDataFrame(mylist).toDF("id","name","age")

In [55]:
rough_df.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1| kapil| 34|
|  1| kapil| 34|
|  1|satish| 26|
|  2|satish| 26|
+---+------+---+



In [56]:
rough_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [58]:
#we can use distinct() to remove duplicates but it will remove records if all records are duplicate

In [60]:
rough_df1=rough_df.distinct()

In [61]:
rough_df1.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|satish| 26|
|  2|satish| 26|
|  1| kapil| 34|
+---+------+---+



In [62]:
rough_df.select("id").distinct().show()

+---+
| id|
+---+
|  1|
|  2|
+---+



In [63]:
# another way of removing duplicates is dropDuplicates()

In [64]:
rough_df.dropDuplicates().show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|satish| 26|
|  2|satish| 26|
|  1| kapil| 34|
+---+------+---+



In [65]:
rough_df.dropDuplicates(["name","age"]).show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1| kapil| 34|
|  1|satish| 26|
+---+------+---+

