In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

# Create Spark session
spark = SparkSession.builder \
    .appName("Spark with Hive") \
    .enableHiveSupport() \
    .getOrCreate()

data = [
    ["Product A", 1001, datetime.strptime("2023-07-20", "%Y-%m-%d"), datetime.strptime("2023-07-20 10:15:30", "%Y-%m-%d %H:%M:%S"), 29.99],
    ["Product B", 1002, datetime.strptime("2023-07-19", "%Y-%m-%d"), datetime.strptime("2023-07-19 14:20:45", "%Y-%m-%d %H:%M:%S"), 49.99],
    ["Product C", 1003, datetime.strptime("2023-07-18", "%Y-%m-%d"), datetime.strptime("2023-07-18 09:30:15", "%Y-%m-%d %H:%M:%S"), 39.99],
    ["Product D", 1004, datetime.strptime("2023-07-17", "%Y-%m-%d"), datetime.strptime("2023-07-17 16:45:00", "%Y-%m-%d %H:%M:%S"), 19.99]
]

# Define schema
schema = StructType([
    StructField("Product", StringType(), True),
    StructField("ID", IntegerType(), True),
    StructField("Date", DateType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("Price", FloatType(), True)
])

df = spark.createDataFrame(data, schema)

df.printSchema()

df.show()

root
 |-- Product: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Price: float (nullable = true)

+---------+----+----------+-------------------+-----+
|  Product|  ID|      Date|          Timestamp|Price|
+---------+----+----------+-------------------+-----+
|Product A|1001|2023-07-20|2023-07-20 10:15:30|29.99|
|Product B|1002|2023-07-19|2023-07-19 14:20:45|49.99|
|Product C|1003|2023-07-18|2023-07-18 09:30:15|39.99|
|Product D|1004|2023-07-17|2023-07-17 16:45:00|19.99|
+---------+----+----------+-------------------+-----+



In [None]:
# First read example should not infer schema, ignore header row, provide explicit column name and datatype

# Define schema
schema = StructType([
    StructField("OrderID", StringType(), True),
    StructField("OrderItemID", IntegerType(), True),
    StructField("ProductID", StringType(), True),
    StructField("SellerID", StringType(), True),
    StructField("ShippingLimitDate", TimestampType(), True),
    StructField("Price", DoubleType(), True),
    StructField("FreightValue", DoubleType(), True)
])

hdfs_path = '/tmp/spark_datasets/order_items_dataset.csv'

df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'false').schema(schema).load(hdfs_path)

df.printSchema()

df.show(5)

root
 |-- OrderID: string (nullable = true)
 |-- OrderItemID: integer (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- SellerID: string (nullable = true)
 |-- ShippingLimitDate: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- FreightValue: double (nullable = true)



[Stage 9:>                                                          (0 + 1) / 1]

+--------------------+-----------+--------------------+--------------------+-------------------+-----+------------+
|             OrderID|OrderItemID|           ProductID|            SellerID|  ShippingLimitDate|Price|FreightValue|
+--------------------+-----------+--------------------+--------------------+-------------------+-----+------------+
|00010242fe8c5a6d1...|          1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|       13.29|
|00018f77f2f0320c5...|          1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|       19.93|
|000229ec398224ef6...|          1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|       17.87|
|00024acbcdf0a6daa...|          1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|       12.79|
|00042b26cf59d7ce6...|          1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|       18.14|
+--------------------+-----------+--------------------+-----------------

                                                                                

In [None]:

# Second read example should infer schema, ignore header row

hdfs_path = '/tmp/spark_datasets/order_items_dataset.csv'

# File path pattern if we are reading from different hadoop cluster
# hdfs_path = 'hdfs://<namenode_host_ip_address>:<port>/tmp/spark_datasets/order_items_dataset.csv'

df = spark.read.format('csv').option('header','true').option('inferSchema','true').load(hdfs_path)

df.printSchema()

df.show()

                                                                                

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 

                                                                                

In [None]:
# check default number of partitions
print(spark.sparkContext.defaultParallelism)

2


In [None]:
# check default size of each partition
print(spark.conf.get("spark.sql.files.maxPartitionBytes"))

134217728b


In [10]:
print(f'Number of partitions: {df.rdd.getNumPartitions()}')

df_new = df.repartition(5)

print(f'Number of partitions: {df_new.rdd.getNumPartitions()}')

Number of partitions: 2




Number of partitions: 5


In [None]:
# Select columns in different options

from pyspark.sql.functions import *

df.select('order_id').show(5)

df.select('order_id', 'shipping_limit_date').show(5)

df.select(col('order_id'), col('shipping_limit_date')).show(5)

df.select(col('order_id').alias('oid'), col('shipping_limit_date').alias('limit_date')).show(5)

+--------------------+
|            order_id|
+--------------------+
|00010242fe8c5a6d1...|
|00018f77f2f0320c5...|
|000229ec398224ef6...|
|00024acbcdf0a6daa...|
|00042b26cf59d7ce6...|
+--------------------+
only showing top 5 rows

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|00010242fe8c5a6d1...|2017-09-19 09:45:35|
|00018f77f2f0320c5...|2017-05-03 11:05:13|
|000229ec398224ef6...|2018-01-18 14:48:30|
|00024acbcdf0a6daa...|2018-08-15 10:10:18|
|00042b26cf59d7ce6...|2017-02-13 13:57:51|
+--------------------+-------------------+
only showing top 5 rows

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|00010242fe8c5a6d1...|2017-09-19 09:45:35|
|00018f77f2f0320c5...|2017-05-03 11:05:13|
|000229ec398224ef6...|2018-01-18 14:48:30|
|00024acbcdf0a6daa...|2018-08-15 10:10:18|
|00042b26cf59d7ce6...|2017-02-13 13:57:51|
+-----------

In [None]:
# Derive new column using withColumn

df2 = df.withColumn("year", year(col("shipping_limit_date"))).withColumn("month", month(col("shipping_limit_date")))

df2.select("order_id", "shipping_limit_date", "year", "month").show(5)

+--------------------+-------------------+----+-----+
|            order_id|shipping_limit_date|year|month|
+--------------------+-------------------+----+-----+
|00010242fe8c5a6d1...|2017-09-19 09:45:35|2017|    9|
|00018f77f2f0320c5...|2017-05-03 11:05:13|2017|    5|
|000229ec398224ef6...|2018-01-18 14:48:30|2018|    1|
|00024acbcdf0a6daa...|2018-08-15 10:10:18|2018|    8|
|00042b26cf59d7ce6...|2017-02-13 13:57:51|2017|    2|
+--------------------+-------------------+----+-----+
only showing top 5 rows



In [None]:
# Rename existing column using withColumnRenamed

df3 = df2.withColumnRenamed('shipping_limit_date', 'shipping_limit_datetime')

df3.select("order_id", "shipping_limit_datetime").show(5)

+--------------------+-----------------------+
|            order_id|shipping_limit_datetime|
+--------------------+-----------------------+
|00010242fe8c5a6d1...|    2017-09-19 09:45:35|
|00018f77f2f0320c5...|    2017-05-03 11:05:13|
|000229ec398224ef6...|    2018-01-18 14:48:30|
|00024acbcdf0a6daa...|    2018-08-15 10:10:18|
|00042b26cf59d7ce6...|    2017-02-13 13:57:51|
+--------------------+-----------------------+
only showing top 5 rows



In [None]:
# Filter condition

df3.filter( col('order_id') == '00010242fe8c5a6d1ba2dd792cb16214' ).show(5)

order_ids = ['00010242fe8c5a6d1ba2dd792cb16214','00018f77f2f0320c557190d7a144bdd3']

df3.filter( col('order_id').isin(order_ids) ).show(5)

df3.filter( (col('price') < 50) & (col("freight_value")  < 10) ).show(5)

# SQL Type Expression
df3.filter("price < 50 and freight_value  < 10").show(5)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+----

In [None]:
# Example to drop a column

df3.drop('month').show(5)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13|239.9|        19.93|2017|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|    2018-01-18 14:48:30|199.0|        17.87|2018|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|    2018-08-15 10:10:18|12.99|        12.79|2018|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|    2017-02-13 13:57:51|19

In [None]:
#drop duplicates row based on multiple columns

df3.dropDuplicates(['order_id', 'order_item_id']).show(5)

# order_id , order_item_id, c1, c2
#   1      ,    2        , A , B
#   1      ,    2        , A , B
#   1      ,    2        , C , D
#   1      ,    3        , E , F

# order_id , order_item_id, c1, c2
#   1      ,    2        , A , B
#   1      ,    3        , E , F

[Stage 44:>                                                         (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13| 239.9|        19.93|2017|    5|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|    2018-01-18 14:48:30| 199.0|        17.87|2018|    1|
|00048cc3ae777c65d...|            1|ef92defde845ab845...|6426d21aca402a131...|    2017-05-23 03:55:27|  21.9|        12.69|2017|    5|
|0005a1a1728c9d785...|            1|310ae3c140ff94b03...|a416b6a846a117243...|    2018-03-26 18:31:29|145.95|        11.65|2018|    3|
|0005f50442cb953dc...|            1|4535b0e1091c278df..

                                                                                

In [None]:
# get distinct rows

df3.dropDuplicates().show(5)

In [None]:
# arrange data using order by

df3.orderBy( col('price').desc() ).show(5)

df3.orderBy( col('price').asc(), col('freight_value').desc() ).show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|0812eb902a67711a1...|            1|489ae2aa008f02150...|e3b4998c7a498169d...|    2017-02-16 20:37:36|6735.0|       194.31|2017|    2|
|fefacc66af859508b...|            1|69c590f7ffc7bf8db...|80ceebb4ee9b31afb...|    2018-08-02 04:05:13|6729.0|       193.21|2018|    8|
|f5136e38d1a14a4db...|            1|1bdf5e6731585cf01...|ee27a8f15b1dded4d...|    2017-06-15 02:45:17|6499.0|       227.66|2017|    6|
|a96610ab360d42a2e...|            1|a6492cc69376c469a...|59417c56835dd8e2e...|    2017-04-18 13:25:18|4799.0|       151.34|2017|    4|
|199af31afc78c699f...|            1|c3ed642d592594bb6..



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|
|8272b63d03f5f79c5...|            4|05b515fdc76e888aa...|2709af9587499e95e...|    2017-07-21 18:25:23|  1.2|         7.89|2017|    7|
|8272b63d03f5f79c5...|            2|05b515fdc76e888aa...|2709a

                                                                                

In [None]:
# Group By Operation

# on single column
df3.groupBy('year').agg( count('*').alias('total_count'),
                         avg('price').alias('avg_price'),
                         sum('price').alias('sum_price'),
                         min('price').alias('min_price'),
                         max('price').alias('max_price')
                       ).show(5)

# on multiple column
df3.groupBy('year', 'month').agg( count('*').alias('total_count'),
                                 avg('price').alias('avg_price'),
                                 sum('price').alias('sum_price'),
                                 min('price').alias('min_price'),
                                 max('price').alias('max_price')
                               ).orderBy( col('year').asc(), col('month').asc() ).show(20)

                                                                                

+----+-----------+------------------+-----------------+---------+---------+
|year|total_count|         avg_price|        sum_price|min_price|max_price|
+----+-----------+------------------+-----------------+---------+---------+
|2018|      62511|120.08515685240229| 7506643.24000052|     0.85|   6729.0|
|2017|      49765|121.26732804178806|6034868.579999583|      1.2|   6735.0|
|2020|          4|             86.49|           345.96|    69.99|    99.99|
|2016|        370|134.55654054054082| 49785.9200000001|      6.0|   1399.0|
+----+-----------+------------------+-----------------+---------+---------+





+----+-----+-----------+------------------+------------------+---------+---------+
|year|month|total_count|         avg_price|         sum_price|min_price|max_price|
+----+-----+-----------+------------------+------------------+---------+---------+
|2016|    9|          4|           48.6175|            194.47|    44.99|     59.5|
|2016|   10|        365|135.83712328767152|49580.550000000105|      6.0|   1399.0|
|2016|   12|          1|              10.9|              10.9|     10.9|     10.9|
|2017|    1|        681|117.65747430249674| 80124.74000000028|      2.9|   1999.0|
|2017|    2|       1866|131.82315648445825|245982.00999999908|      3.9|   6735.0|
|2017|    3|       2751| 124.7701199563779| 343242.5999999956|      4.9|   3999.9|
|2017|    4|       2364| 130.3503891708958| 308148.3199999977|      4.9|   4799.0|
|2017|    5|       4150| 121.8446891566257| 505655.4599999967|      3.5|   4690.0|
|2017|    6|       3801|123.38885819521052|469001.04999999516|     3.49|   6499.0|
|201

                                                                                

In [None]:
accum=spark.sparkContext.accumulator(0)

df3.foreach( lambda row: accum.add(row['price']) )

print(accum.value) #Accessed by driver



13591643.70000748


                                                                                

In [None]:
# Case-When statement

df3.withColumn("price_category", when( col('price') >= 100 , "High" )
                                .when( (col('price') < 100) & (col('price') >= 50) , "Medium" )
                                .otherwise("Low")).show(5)
                               

[Stage 61:>                                                         (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|price_category|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|        Medium|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13|239.9|        19.93|2017|    5|          High|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|    2018-01-18 14:48:30|199.0|        17.87|2018|    1|          High|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|    2018-08-15 10:10:18|12.9

                                                                                

In [None]:
# Window functions

from pyspark.sql.window import Window

windowSpec1 = Window.partitionBy('year').orderBy( col('price').asc())

df3.withColumn("dense_rank" , dense_rank().over(windowSpec1) ).show(5)

windowSpec2 = Window.partitionBy('year').orderBy(col('shipping_limit_datetime').asc())

df3.withColumn('running_sum', sum('price').over(windowSpec2)).select('year','price','shipping_limit_datetime','running_sum').show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|dense_rank|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|         1|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|         1|
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|         1|
|f1d5c2e6867fa93ce...|            1|46fce52cef5caa7cc...|2d2322d8421188677...|    2018-08-28 21:30:15|  2.2|         7.39|2018|   

[Stage 70:>                                                         (0 + 1) / 1]

+----+-----+-----------------------+------------------+
|year|price|shipping_limit_datetime|       running_sum|
+----+-----+-----------------------+------------------+
|2016| 59.5|    2016-09-19 00:15:34|              59.5|
|2016|44.99|    2016-09-19 23:11:33|194.47000000000003|
|2016|44.99|    2016-09-19 23:11:33|194.47000000000003|
|2016|44.99|    2016-09-19 23:11:33|194.47000000000003|
|2016|29.99|    2016-10-08 10:34:01|224.46000000000004|
+----+-----+-----------------------+------------------+
only showing top 5 rows



                                                                                

In [39]:
hdfs_path = '/tmp/spark_datasets/sellers_dataset.csv'

sdf = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(hdfs_path)

sdf.printSchema()
sdf.show(5)

                                                                                

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
+--------------------+----------------------+-----------------+------------+
only showing top 5 rows



In [None]:
# Join transformation

result1 = df3.join(broadcast(sdf), df3.seller_id == sdf.seller_id  , 'inner').drop(sdf.seller_id)

# df3.join(broadcast(sdf), df3.seller_id == sdf.seller_id  , 'inner').join(paydf, cond, 'left')

# result2 = result1.join(paydf, cond, 'left')

result1.printSchema()
result1.show(5)


root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_datetime: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+-------------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|seller_zip_code_prefix|  seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+----------------------

In [None]:
# Perform join with alias names of dataframes

result2 = df3.alias('oid').join(sdf.alias('sid'), col('oid.seller_id') == col('sid.seller_id') , 'inner').drop(col('sid.seller_id'))


result2.printSchema()
result2.show(5)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_datetime: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+-------------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|seller_zip_code_prefix|  seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+----------------------

In [None]:
# work with spark SQL

df3.createOrReplaceTempView("ORDER_ITEM")
sdf.createOrReplaceTempView("SELLERS")

joinedDF = spark.sql("select * from ORDER_ITEM oid INNER JOIN SELLERS sid ON oid.seller_id == sid.seller_id")

joinedDF.printSchema()

joinedDF.show(5)



root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_datetime: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------------+----------------------+-------------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|           seller_id|seller_zip_code_prefix|  seller_city|seller_state|
+----------------

In [None]:
# Write data in HDFS without any partition key

result1.write \
    .format('csv') \
    .mode('overwrite') \
    .option('header', 'true') \
    .option('delimiter', ',') \
    .save('/tmp/spark_output/result1')

print("Write Successfull")



Write Successfull


                                                                                

In [None]:
# Write data in parquet format in HDFS without any partition key

result1.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('/tmp/spark_output/result_pq')

print("Write Successfull")



Write Successfull


                                                                                

In [None]:
# Write data in HDFS with partition key

result1.write.mode('overwrite').partitionBy('year').format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/spark_output/result2')
print("Write Successfull")





Write Successfull


                                                                                

In [None]:
# Write data in HDFS into single file

result1.coalesce(1).write.mode('overwrite').format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/spark_output/result3/')

print("Write Successfull")

[Stage 95:>                                                         (0 + 1) / 1]

Write Successfull


                                                                                

In [None]:
# Read json data in spark

file_path = '/tmp/spark_datasets/orders_json_data.json'

json_df = spark.read.json(file_path)

print("Original Schema :")
json_df.printSchema()

json_df.show(5)

# Explode the "purchases" array
purchases_df = json_df.select(
    "user_id",
    "name",
    "address",
    explode(col("purchases")).alias("purchase")
)

# Show the exploded schema
print("Exploded Schema:")
purchases_df.printSchema()
purchases_df.show(5)

# Further explode "items" within each purchase
items_df = purchases_df.select(
    "user_id",
    "name",
    "address",
    col("purchase.order_id").alias("order_id"),
    col("purchase.order_date").alias("order_date"),
    explode(col("purchase.items")).alias("item")
)

# Show the fully exploded DataFrame
print("Fully Exploded Data:")
items_df.show(truncate=False)

Original Schema :
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- name: string (nullable = true)
 |-- purchases: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- items: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- item_name: string (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |-- order_date: string (nullable = true)
 |    |    |-- order_id: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- user_id: string (nullable = true)

+--------------------+-----------+--------------------+-----------------+-------+
|             address|       name|           purchases|registration_date|user_id|
+--------------------+-----------+--------------------+-----------------+-------+
|{Chicago, 311 Exa...|User Name