# Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

# Create spark session
spark = SparkSession.builder \
        .appName("Spark with Hive") \
        .enableHiveSupport() \
        .getOrCreate()

# Hardcoded data
data = [
    ["Product A", 1001, datetime.strptime("2023-07-20", "%Y-%m-%d"), datetime.strptime("2023-07-20 10:15:30", "%Y-%m-%d %H:%M:%S"), 29.99],
    ["Product B", 1002, datetime.strptime("2023-07-19", "%Y-%m-%d"), datetime.strptime("2023-07-19 14:20:45", "%Y-%m-%d %H:%M:%S"), 49.99],
    ["Product C", 1003, datetime.strptime("2023-07-18", "%Y-%m-%d"), datetime.strptime("2023-07-18 09:30:15", "%Y-%m-%d %H:%M:%S"), 39.99],
    ["Product D", 1004, datetime.strptime("2023-07-17", "%Y-%m-%d"), datetime.strptime("2023-07-17 16:45:00", "%Y-%m-%d %H:%M:%S"), 19.99]
]

# Define schema
schema = StructType([
    StructField("Product", StringType(), True),
    StructField("ID", IntegerType(), True),
    StructField("Date", DateType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("Price", FloatType(), True)
])

# Create dataframe
df = spark.createDataFrame(data, schema)

# Print schema
df.printSchema()

# Print Data
df.show()

24/07/21 10:44:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- Product: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Price: float (nullable = true)



                                                                                

+---------+----+----------+-------------------+-----+
|  Product|  ID|      Date|          Timestamp|Price|
+---------+----+----------+-------------------+-----+
|Product A|1001|2023-07-20|2023-07-20 10:15:30|29.99|
|Product B|1002|2023-07-19|2023-07-19 14:20:45|49.99|
|Product C|1003|2023-07-18|2023-07-18 09:30:15|39.99|
|Product D|1004|2023-07-17|2023-07-17 16:45:00|19.99|
+---------+----+----------+-------------------+-----+



## Read Data From HDFS

In [2]:
# First read example should not infer schema, ignore header row, provide explicit column name and datatype

# Define schema
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_item_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", StringType(), True),
    StructField("shipping_limit_date", TimestampType(), True),
    StructField("price", DoubleType(), True),
    StructField("freight_value", DoubleType(), True)
])

hdfs_path = '/spark_input_data/order_items_dataset.csv'

df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'false').schema(schema).load(hdfs_path)

df.printSchema()

df.show(5)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

                                                                                

## Schema Inference

In [3]:
# Second read example should infer schema, ignore header row

hdfs_path = '/spark_input_data/order_items_dataset.csv'

df2 = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(hdfs_path)

df2.printSchema()

df2.show(5)

                                                                                

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48

## Repartition

In [4]:
# Number of partitions after reading from hdfs

print(f"Number of partitions: {df2.rdd.getNumPartitions()}")

df3 = df2.repartition(10)

# Number of partitions after repartition

print(f"Number of partitions: {df3.rdd.getNumPartitions()}")


Number of partitions: 2




Number of partitions: 10


## DataFrame Operations

In [5]:
# select columns in different options
from pyspark.sql.functions import *

# select one column
df3.select('order_id').show(5)
# select multiple columns
df3.select('order_id', 'shipping_limit_date').show(5)
# column aliasing
df3.select(col('order_id').alias('oid'), col('shipping_limit_date').alias('limit_date')).show(5)

                                                                                

+--------------------+
|            order_id|
+--------------------+
|6299bb8e855289b41...|
|71fbb9971d84bf97a...|
|74322a01b770c2ea3...|
|a23fc2b3af4f1a48e...|
|747af114bbea56ac1...|
+--------------------+
only showing top 5 rows



                                                                                

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|f0b47dadd5f372c41...|2018-05-14 04:54:53|
|bbd319ae8e4b46101...|2017-11-24 02:28:04|
|e16fb24453a306d5d...|2018-08-10 03:24:54|
|d4de6d0debe2df72c...|2017-03-13 03:35:12|
|bdbe8da70dcc6e6a2...|2018-04-22 21:52:25|
+--------------------+-------------------+
only showing top 5 rows

+--------------------+-------------------+
|                 oid|         limit_date|
+--------------------+-------------------+
|3bbf8f927f288e4a1...|2017-11-09 14:25:38|
|50c40cfcbb6ce3fca...|2018-06-14 09:52:04|
|51c3d73e0e9052253...|2018-02-22 19:15:27|
|183ee0e3ebd4c1c99...|2018-02-07 20:14:08|
|3a1400b5d4dd3082a...|2018-03-27 17:28:20|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

## Derive New Columns using withColumn 

In [6]:
df4 = df3.withColumn('year', year(col('shipping_limit_date'))).withColumn('month', month(col('shipping_limit_date')))

df4.select('order_id', 'shipping_limit_date', 'year', 'month').show(5)

+--------------------+-------------------+----+-----+
|            order_id|shipping_limit_date|year|month|
+--------------------+-------------------+----+-----+
|f0b47dadd5f372c41...|2018-05-14 04:54:53|2018|    5|
|bbd319ae8e4b46101...|2017-11-24 02:28:04|2017|   11|
|e16fb24453a306d5d...|2018-08-10 03:24:54|2018|    8|
|d4de6d0debe2df72c...|2017-03-13 03:35:12|2017|    3|
|bdbe8da70dcc6e6a2...|2018-04-22 21:52:25|2018|    4|
+--------------------+-------------------+----+-----+
only showing top 5 rows



## Renaming the Column

In [7]:
# renaming existing column using withColumnRenamed

df5 = df4.withColumnRenamed("shipping_limit_date", "shipping_limit_datetime")
df5.select("order_id", "shipping_limit_datetime").show(5)

+--------------------+-----------------------+
|            order_id|shipping_limit_datetime|
+--------------------+-----------------------+
|f0b47dadd5f372c41...|    2018-05-14 04:54:53|
|bbd319ae8e4b46101...|    2017-11-24 02:28:04|
|e16fb24453a306d5d...|    2018-08-10 03:24:54|
|d4de6d0debe2df72c...|    2017-03-13 03:35:12|
|bdbe8da70dcc6e6a2...|    2018-04-22 21:52:25|
+--------------------+-----------------------+
only showing top 5 rows



### Filter Conditions

filter conditions can be written 2 ways:
1. object-based
2. sql type of expressions

In [8]:
# filter condition

df5.filter(col("order_id") == "00010242fe8c5a6d1ba2dd792cb16214").show()

order_id_list = ["00010242fe8c5a6d1ba2dd792cb16214", "00018f77f2f0320c557190d7a144bdd3"]

df5.filter(col("order_id").isin(order_id_list)).show()

df5.filter((col("price")<50) & (col("freight_value")<10)).show(5)

# SQL type expression

df5.filter("price < 50 and freight_value < 10").show(5)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+----

#### Drop Columns, Drop duplicates 

In [9]:
df5.drop('month').show(5)

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|
|025c72e88fbf2358b...|            2|bef21943bc2335188...|e49c26c3edfa46d22...|    2017-03-21 21:24:27|  19.9|         20.8|2017|
|23d16dddab46fd3d0...|            1|cca8e09ba6f2d35e4...|43f8c9950d11ecd03...|    2018-01-31 22:17:51|109.99|        14.52|2018|
|71c0d1686c9b55563...|            2|eb6c2ecde53034fc9...|1025f0e2d44d7041d...|    2017-12-01 19:3

In [10]:
# drop duplicates rows based on multiple columns 
df5.dropDuplicates(["order_id", "order_item_id"]).show(5)


[Stage 42:>                                                         (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13| 239.9|        19.93|2017|    5|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|    2018-01-18 14:48:30| 199.0|        17.87|2018|    1|
|00048cc3ae777c65d...|            1|ef92defde845ab845...|6426d21aca402a131...|    2017-05-23 03:55:27|  21.9|        12.69|2017|    5|
|0005a1a1728c9d785...|            1|310ae3c140ff94b03...|a416b6a846a117243...|    2018-03-26 18:31:29|145.95|        11.65|2018|    3|
|0005f50442cb953dc...|            1|4535b0e1091c278df..

                                                                                

when drop duplicates whether shuffling will happen?
    yes, because to search data in all partitions. 
    even for distinct operation also shuffling will happen

In [11]:

df5.distinct().show(5)

# this will drop the records when entire record is duplicated. i.e. across all columns
df5.dropDuplicates().show(5)


                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|f4b56fe7668e96ac9...|            1|0e1cd2215a878ad18...|17eea220a40cc0d2c...|    2017-10-05 20:14:08|158.0|        36.43|2017|   10|
|de1f63780a06d4800...|            1|86ef699f48c083648...|850913d59ce317156...|    2018-03-15 10:15:30| 49.0|        18.23|2018|    3|
|f93af565de05d2427...|            1|2b4609f8948be1887...|cc419e0650a3c5ba7...|    2018-06-11 12:10:28|79.99|         8.32|2018|    6|
|dffe39ab35d34d2dc...|            1|8983a3b149303c013...|9f505651f4a6abe90...|    2018-08-22 14:30:02| 99.0|        15.79|2018|    8|
|f2db192253fbe0e93...|            1|2701fc4808fcb783d...|7f026



+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|bb5cce57f8d80c481...|            1|6ec26b3516fecd18c...|c70c1b0d8ca86052f...|    2018-06-29 11:57:21|110.32|         8.03|2018|    6|
|d8fd9830792304098...|            1|a00d11a2119bd70d6...|79ebd9a61bac3eaf8...|    2018-07-16 11:11:49|  99.9|        19.89|2018|    7|
|c5119bb429cf05b92...|            1|192b332c511e484ea...|41b86b552e54e3a70...|    2017-04-04 02:15:19|  74.5|        14.69|2017|    4|
|e1041ca455e08b097...|            1|5a6e53c3b4e8684b1...|7299e27ed73d2ad98...|    2017-03-15 11:09:46| 12.99|        14.52|2017|    3|
|d7f4e2f755cf2f40a...|            1|cec09725da5ed0147..

                                                                                

### sorting 

In [12]:
# arrange data using order by

df5.orderBy(col("price").desc()).show(5)

df5.orderBy(col("price").asc(), col("freight_value").desc()).show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|0812eb902a67711a1...|            1|489ae2aa008f02150...|e3b4998c7a498169d...|    2017-02-16 20:37:36|6735.0|       194.31|2017|    2|
|fefacc66af859508b...|            1|69c590f7ffc7bf8db...|80ceebb4ee9b31afb...|    2018-08-02 04:05:13|6729.0|       193.21|2018|    8|
|f5136e38d1a14a4db...|            1|1bdf5e6731585cf01...|ee27a8f15b1dded4d...|    2017-06-15 02:45:17|6499.0|       227.66|2017|    6|
|a96610ab360d42a2e...|            1|a6492cc69376c469a...|59417c56835dd8e2e...|    2017-04-18 13:25:18|4799.0|       151.34|2017|    4|
|199af31afc78c699f...|            1|c3ed642d592594bb6..



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|
|8272b63d03f5f79c5...|            3|05b515fdc76e888aa...|2709af9587499e95e...|    2017-07-21 18:25:23|  1.2|         7.89|2017|    7|
|8272b63d03f5f79c5...|           13|270516a3f41dc035a...|2709a

                                                                                

#### Group By Operations

In [13]:
# single column
df5.groupBy('year').agg(count("*").alias("total_count"),
                        min("price").alias("min_price"),
                        avg("price").alias("avg_price"),
                        max("price").alias("max_price"),
                        sum("price").alias("total_price")).show(5)

# multi column
df5.groupBy('year', 'month').agg(count("*").alias("total_count"),
                        min("price").alias("min_price"),
                        avg("price").alias("avg_price"),
                        max("price").alias("max_price"),
                        sum("price").alias("total_price")).orderBy(col("year").asc(), col("month").asc()).show(5)

                                                                                

+----+-----------+---------+-----------------+---------+------------------+
|year|total_count|min_price|        avg_price|max_price|       total_price|
+----+-----------+---------+-----------------+---------+------------------+
|2018|      62511|     0.85|120.0851568523971|   6729.0| 7506643.240000196|
|2017|      49765|      1.2|121.2673280417994|   6735.0| 6034868.580000147|
|2016|        370|      6.0|134.5565405405405|   1399.0|49785.919999999984|
|2020|          4|    69.99|            86.49|    99.99|            345.96|
+----+-----------+---------+-----------------+---------+------------------+





+----+-----+-----------+---------+------------------+---------+------------------+
|year|month|total_count|min_price|         avg_price|max_price|       total_price|
+----+-----+-----------+---------+------------------+---------+------------------+
|2016|    9|          4|    44.99| 48.61750000000001|     59.5|194.47000000000003|
|2016|   10|        365|      6.0|135.83712328767118|   1399.0| 49580.54999999998|
|2016|   12|          1|     10.9|              10.9|     10.9|              10.9|
|2017|    1|        681|      2.9|117.65747430249623|   1999.0| 80124.73999999993|
|2017|    2|       1866|      3.9| 131.8231564844589|   6735.0| 245982.0100000003|
+----+-----+-----------+---------+------------------+---------+------------------+
only showing top 5 rows



                                                                                

### Accumulators
Accumulator is like a global variable.
aggregate data across all partitions 

In [14]:
accum = spark.sparkContext.accumulator(0)  # the variable is initialized with value 0

df5.foreach(lambda row: accum.add(row["price"])) # each record price is added to accumulator

#accessed by driver
print(accum.value)



13591643.69999942


                                                                                

#### Case When Statement
usually we are deriving a new column by writting case when statement. 
so, in spark to add a new column we use withColumn which takes column name, logic to derive the column


In [15]:
df5.withColumn("price_category", when(col("price")>= 100, "High")
                                .when((col("price")<100) & (col("price")>=50), "Medium")
                                .otherwise("low")).show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+--------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|price_category|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+--------------+
|f82a3c6e1e4ad2996...|            1|4052517cac9e78357...|ce27a3cc3c8cc1ea7...|    2017-07-20 02:55:38| 150.0|        16.81|2017|    7|          High|
|fce3329932f72d112...|            1|0fa81e7123fd0ebe0...|da8622b14eb17ae28...|    2018-02-01 03:16:57| 109.9|        13.79|2018|    2|          High|
|febc913d8a07693e8...|            1|50f1880f198925172...|7a67c85e85bb2ce85...|    2017-12-18 19:09:57|139.99|        18.23|2017|   12|          High|
|f489dacdbad0317b1...|            1|1eb4a9aee05c9cad0...|dbc22125167c298ef...|    2017-11-20 11:50:4

                                                                                

#### Window Functions
How can we apply window functions in spark?
1. we need to import the Window class first
2. we need to define window spec i.e. partition by order by clauses

In [22]:
# Window functions
from pyspark.sql.window import Window

# dense_rank()
windowSpec = Window.partitionBy('year').orderBy(col('price').desc())

df5.withColumn('price_rank', dense_rank().over(windowSpec)).show(5)
                                                                 
# cummulative sum
windowSpec1 = Window.partitionBy('year').orderBy(col('shipping_limit_datetime').asc())

df5.withColumn('runnig_sum', sum('price').over(windowSpec1)).show(5)                                                       

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-------+-------------+----+-----+----------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|  price|freight_value|year|month|price_rank|
+--------------------+-------------+--------------------+--------------------+-----------------------+-------+-------------+----+-----+----------+
|fefacc66af859508b...|            1|69c590f7ffc7bf8db...|80ceebb4ee9b31afb...|    2018-08-02 04:05:13| 6729.0|       193.21|2018|    8|         1|
|8dbc85d1447242f3b...|            1|259037a6a41845e45...|c72de06d72748d1a0...|    2018-06-28 12:36:36| 4590.0|        91.78|2018|    6|         2|
|426a9742b533fc6fe...|            1|a1beef8f3992dbd4c...|512d298ac2a96d193...|    2018-08-16 14:24:28|4399.87|       113.45|2018|    8|         3|
|68101694e5c5dc733...|            1|6cdf8fc1d741c7658...|ed4acab38528488b6...|    2018-04-05 08:27:27|4099.99|        

[Stage 102:>                                                        (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|        runnig_sum|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+------------------+
|b2d1902261f105c5c...|            1|5ed3835ea6f96c77b...|aba1721a889e04dec...|    2018-01-01 22:08:31| 139.0|         8.23|2018|    1|             139.0|
|3c8e80909dd1066fd...|            1|4308439e0d80d5fe0...|59fb871bf6f4522a8...|    2018-01-01 22:13:24|179.99|         27.8|2018|    1|            318.99|
|f2e5bcbd102cd01f1...|            1|2bb3e85f2a403543f...|76d64c4aca3a7baf2...|    2018-01-01 22:27:15| 348.9|       118.06|2018|    1|            667.89|
|2e7080c8c24e4a977...|            1|3bdc89e963c6651b8...|fffd5413c0700ac82..

                                                                                

#### Joins


In [23]:
# Read sellers sample data with infer schema ignore header row

hdfs_path ="/spark_input_data/sellers_dataset.csv"
sdf = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(hdfs_path)

sdf.printSchema()
sdf.show(10)

                                                                                

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
|c240c4061717ac180...|                 20920|   rio de janeiro|          RJ|
|e49c26c3edfa46d22...|                 55325|           brejao|          PE|
|1b938a7ec6ac5061a...|                 16

In [26]:
# inner join -- broadcast join

result1 = df5.join(broadcast(sdf), df5.seller_id == sdf.seller_id, 'inner').drop(sdf.seller_id)
result1.show(5)

# result = df5.join(sdf, df5.seller_id == sdf.seller_id, 'inner').join(pdf, df5.product_id == pdf.product_id, 'inner')
# result.show(10)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+----------------------+-----------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|seller_zip_code_prefix|seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+----------------------+-----------+------------+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|    9|                  4265|  sao paulo|          SP|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|    6|                  1026|  sao paulo|          SP|
|025c72e88fbf2358b...|            2|bef21943bc2335188...|e49c26c3edfa46d22...|    201

In [27]:
# perform join with alias names of dataframes

result2 = df5.alias('oid').join(sdf.alias('sid'), col('oid.seller_id') == col('sid.seller_id'), 'inner').drop(col('sid.seller_id'))
result2.show(5)

[Stage 113:>                                                        (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+----------------------+-----------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|seller_zip_code_prefix|seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+----------------------+-----------+------------+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|    9|                  4265|  sao paulo|          SP|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|    6|                  1026|  sao paulo|          SP|
|025c72e88fbf2358b...|            2|bef21943bc2335188...|e49c26c3edfa46d22...|    201

                                                                                

## PySpark SQL
To work with spark sql, we need to convert data frames in the Temporary View or Temporary Tables.
1. We will make use of CreateOrReplaceTempView() of dataframe. 
2. write your sql query statements.

The scope of these temp tables to spark session and dropped when session closes. 

In [28]:
# work with Spark SQL

df5.createOrReplaceTempView('ORDER_ITEMS') # this will convert the dataframe into temp table with the name order_items
sdf.createOrReplaceTempView('SELLERS')

jdf = spark.sql(" select * from ORDER_ITEMS oid join SELLERS sid on oid.seller_id == sid.seller_id")
jdf.show(10)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+--------------------+----------------------+-----------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|           seller_id|seller_zip_code_prefix|seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+--------------------+----------------------+-----------+------------+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|    9|1da3aeb70d7989d1e...|                  4265|  sao paulo|          SP|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|    6|6da1992f915d77be9...|                  1026|  sao 

### Write Operations 

up until now we did read operations on spark application.


In [29]:
# write data in HDFS without any partition key

result1.write.format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/result1/')
print("write successfully")

## Note: when you check the data in hdfs location, you can find the data is stored in 10 partitions. Because we did repartition to 10 earlier on df3 data frame that's why data is stored in 10 partitions
# Q: whether a new stage is created for this write operation in spark web ui?
# A: No, because here just 10 parallel tasks for wrote the data and there is no data shuffling or data did not move from one partition to another.



write successfully


                                                                                

If you have Hive dynamic partitioned external table which reads data from the below mentioned location 
then data will be automatically fetched by hive server for analysis without reloading the data into tables for each update.

In [None]:
# write data in HDFS with partition key

result1.write.partitionBy('year').format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/result2/')
print("write successfully')
      
# Q: whether a new stage is created for this write operation in spark web ui?
# A: Yes a new stage will be created because data shuffling happens to look for specific years data to load into specific partition. 

In [30]:
# write data in HDFS into a single file

result1.coalesce(1).write.format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/result3/')
print("write successfully")

[Stage 125:>                                                        (0 + 1) / 1]

write successfully


                                                                                

#### Incremental ingestion pipeline 
Here we are getting data from source location and then reading into spark application for doing some transfermations. 
load the transformed data into hive partitioned table.

In [None]:
# write data in HIVE directly
spark.sql("""SET hive.exec.dynamic.partition.mode=nonstrict""")

spark.sql("""USE spark_db""")

# create a partitioned hive table
spark.sql("""CREATE TABLE IF NOT EXISTS orders_sellers_data (
            order_id STRING,
            order_item_id INT,
            product_id STRING,
            price DOUBLE,
            freight_value DOUBLE,
            seller_city STRING
            )
            PARTITIONED BY (year INT)
        """)

# write data frame to hive table
result1.select('order_id',
               'order_item_id',
               'product_id',
               'price',
               'freight_value',
               'seller_city',
               'year').write.mode('append').insertInto('orders_sellers_data')

print("write successfully")

24/07/21 17:18:14 WARN SetCommand: 'SET hive.exec.dynamic.partition.mode=nonstrict' might not work, since Spark doesn't support changing the Hive config dynamically. Please pass the Hive-specific config by adding the prefix spark.hadoop (e.g. spark.hadoop.hive.exec.dynamic.partition.mode) when starting a Spark application. For details, see the link: https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties.
24/07/21 17:18:14 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.