In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

# Create Spark session
spark = SparkSession.builder \
    .appName("Spark with Hive") \
    .enableHiveSupport() \
    .getOrCreate()

# Hardcoded data
data = [
    ["Product A", 1001, datetime.strptime("2023-07-20", "%Y-%m-%d"), datetime.strptime("2023-07-20 10:15:30", "%Y-%m-%d %H:%M:%S"), 29.99],
    ["Product B", 1002, datetime.strptime("2023-07-19", "%Y-%m-%d"), datetime.strptime("2023-07-19 14:20:45", "%Y-%m-%d %H:%M:%S"), 49.99],
    ["Product C", 1003, datetime.strptime("2023-07-18", "%Y-%m-%d"), datetime.strptime("2023-07-18 09:30:15", "%Y-%m-%d %H:%M:%S"), 39.99],
    ["Product D", 1004, datetime.strptime("2023-07-17", "%Y-%m-%d"), datetime.strptime("2023-07-17 16:45:00", "%Y-%m-%d %H:%M:%S"), 19.99]
]

# Define schema
schema = StructType([
    StructField("Product", StringType(), True),
    StructField("ID", IntegerType(), True),
    StructField("Date", DateType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("Price", FloatType(), True)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Print schema
df.printSchema()

# Print data
df.show()


23/07/21 12:57:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- Product: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Price: float (nullable = true)



                                                                                

+---------+----+----------+-------------------+-----+
|  Product|  ID|      Date|          Timestamp|Price|
+---------+----+----------+-------------------+-----+
|Product A|1001|2023-07-20|2023-07-20 10:15:30|29.99|
|Product B|1002|2023-07-19|2023-07-19 14:20:45|49.99|
|Product C|1003|2023-07-18|2023-07-18 09:30:15|39.99|
|Product D|1004|2023-07-17|2023-07-17 16:45:00|19.99|
+---------+----+----------+-------------------+-----+



In [48]:
# First read example should not infer schema, ignore header row, provide explicit column name and datatype

# Define schema
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_item_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", StringType(), True),
    StructField("shipping_limit_date", TimestampType(), True),
    StructField("price", DoubleType(), True),
    StructField("freight_value", DoubleType(), True)
])

hdfs_path = '/tmp/input_data/order_items_dataset.csv'
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'false').schema(schema).load(hdfs_path)

df.printSchema()

df.show(5)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



[Stage 171:>                                                        (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

                                                                                

In [49]:
# Second read example should infer schema, ignore header row

hdfs_path = '/tmp/input_data/order_items_dataset.csv'
df2 = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(hdfs_path)

# Print schema and sample data
df2.printSchema()
df2.show(5)

                                                                                

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48

In [50]:
# Number of partitions after reading from hdfs
print(f'Number of partitions: {df2.rdd.getNumPartitions()}')

df3 = df2.repartition(10)

# Number of partitions after repartition
print(f'Number of partitions: {df3.rdd.getNumPartitions()}')

Number of partitions: 2




Number of partitions: 10


In [51]:
# Select columns in different options
from pyspark.sql.functions import *

df3.select('order_id').show(5)
df3.select('order_id', 'shipping_limit_date').show(5)
df3.select(col('order_id'), col('shipping_limit_date')).show(5)
df3.select(col('order_id').alias('oid'), col('shipping_limit_date').alias('limit_date')).show(5)

                                                                                

+--------------------+
|            order_id|
+--------------------+
|a4946418011e1f657...|
|d805508e2cdf3a5b1...|
|c502981c13af31e66...|
|a5d049fb139be4e41...|
|c3dbc724471dca9ff...|
+--------------------+
only showing top 5 rows



                                                                                

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|8da1b0cac91830c54...|2018-02-21 16:07:37|
|1190d4d9e67a90f9d...|2017-04-26 22:32:17|
|53170b475d08133ec...|2018-05-01 21:12:51|
|087889e1fca3301cc...|2017-12-08 09:30:21|
|8eadc079bd4cf0c38...|2018-04-11 14:35:28|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|ab9f455e1c97dd7b2...|2017-12-26 09:38:50|
|c1fbcbfba6084a6a4...|2018-05-16 17:37:43|
|e761005140dd93f80...|2017-11-21 20:26:29|
|b16fc35ee8e5ad288...|2017-07-26 22:10:25|
|bee3bdf96f440b404...|2018-02-02 19:15:32|
+--------------------+-------------------+
only showing top 5 rows



[Stage 185:>                                                        (0 + 2) / 2]

+--------------------+-------------------+
|                 oid|         limit_date|
+--------------------+-------------------+
|ab9f455e1c97dd7b2...|2017-12-26 09:38:50|
|c1fbcbfba6084a6a4...|2018-05-16 17:37:43|
|e761005140dd93f80...|2017-11-21 20:26:29|
|b16fc35ee8e5ad288...|2017-07-26 22:10:25|
|bee3bdf96f440b404...|2018-02-02 19:15:32|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

In [52]:
# Derive new column using withColumn
df4 = df3.withColumn("year", year(col("shipping_limit_date"))).withColumn("month", month(col("shipping_limit_date")))

df4.select("order_id", "shipping_limit_date", "year", "month").show(5)



+--------------------+-------------------+----+-----+
|            order_id|shipping_limit_date|year|month|
+--------------------+-------------------+----+-----+
|ab9f455e1c97dd7b2...|2017-12-26 09:38:50|2017|   12|
|c1fbcbfba6084a6a4...|2018-05-16 17:37:43|2018|    5|
|e761005140dd93f80...|2017-11-21 20:26:29|2017|   11|
|b16fc35ee8e5ad288...|2017-07-26 22:10:25|2017|    7|
|bee3bdf96f440b404...|2018-02-02 19:15:32|2018|    2|
+--------------------+-------------------+----+-----+
only showing top 5 rows



                                                                                

In [53]:
# Rename existing column using withColumnRenamed
df5 = df4.withColumnRenamed('shipping_limit_date', 'shipping_limit_datetime')

df5.select("order_id", "shipping_limit_datetime").show(5)

[Stage 191:>                                                        (0 + 2) / 2]

+--------------------+-----------------------+
|            order_id|shipping_limit_datetime|
+--------------------+-----------------------+
|8da1b0cac91830c54...|    2018-02-21 16:07:37|
|1190d4d9e67a90f9d...|    2017-04-26 22:32:17|
|53170b475d08133ec...|    2018-05-01 21:12:51|
|087889e1fca3301cc...|    2017-12-08 09:30:21|
|8eadc079bd4cf0c38...|    2018-04-11 14:35:28|
+--------------------+-----------------------+
only showing top 5 rows



                                                                                

In [54]:
# Filter condition

df5.filter(col("order_id") == '00010242fe8c5a6d1ba2dd792cb16214').show(5)

order_li = ['00010242fe8c5a6d1ba2dd792cb16214','00018f77f2f0320c557190d7a144bdd3']
df5.filter(col("order_id").isin(order_li)).show(5)

df5.filter((col("price") < 50) & (col("freight_value")  < 10)).show(5)

# SQL Type expression
df5.filter("price < 50 and freight_value  < 10").show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+



                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13|239.9|        19.93|2017|    5|
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+



                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|c53b0fc1c72d7bda1...|            1|d8707dd860f966067...|31da954dc0855f249...|    2018-04-09 05:28:25| 47.0|         7.71|2018|    4|
|cacbbea6c8fa8e5cf...|            1|57b82771ce003dac0...|951e8cef368f09bb3...|    2018-08-20 22:45:14|38.99|         9.49|2018|    8|
|d5ea2169e37b50c52...|            1|7344630ca6c1a4917...|db4350fd57ae30082...|    2017-06-05 04:42:44| 16.9|         5.98|2017|    6|
|d32c8abcdc749f46d...|            1|2ada8214e59b86de7...|c1552b1dab6e6f760...|    2018-08-28 14:35:25| 34.9|          7.5|2018|    8|
|d47e5fbb24cade152...|            1|1ed4a7921293b8e5e...|6edac



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|c53b0fc1c72d7bda1...|            1|d8707dd860f966067...|31da954dc0855f249...|    2018-04-09 05:28:25| 47.0|         7.71|2018|    4|
|cacbbea6c8fa8e5cf...|            1|57b82771ce003dac0...|951e8cef368f09bb3...|    2018-08-20 22:45:14|38.99|         9.49|2018|    8|
|d5ea2169e37b50c52...|            1|7344630ca6c1a4917...|db4350fd57ae30082...|    2017-06-05 04:42:44| 16.9|         5.98|2017|    6|
|d32c8abcdc749f46d...|            1|2ada8214e59b86de7...|c1552b1dab6e6f760...|    2018-08-28 14:35:25| 34.9|          7.5|2018|    8|
|d47e5fbb24cade152...|            1|1ed4a7921293b8e5e...|6edac

                                                                                

In [21]:
# Example for drop and dropDuplicates, on multiple columns

df5.drop('month').show(5)

In [22]:
#drop duplicates row based on multiple columns

df5.dropDuplicates(['order_id', 'order_item_id']).show(5)

In [23]:
# get distinct rows

df5.distinct().show(5)

df5.dropDuplicates().show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|24e7eb8286ce5ef52...|            1|6fd08d44046ab994b...|d9a84e1403de8da0c...|    2018-02-28 15:31:07| 39.9|         15.1|2018|    2|
|5f827831438fdab57...|            1|3096d513ef378c2f6...|6560211a19b47992c...|    2018-04-26 20:30:46|129.0|         7.77|2018|    4|
|77f0f62ff8a596205...|            1|9361d7d8750b24496...|7d294cf9a6a69dc6a...|    2018-07-20 12:45:23|51.45|        41.34|2018|    7|
|30bf87367ed2e63f6...|            1|ce341595864bee3e3...|1c129092bf23f28a5...|    2018-08-21 14:50:14| 47.9|         9.09|2018|    8|
|54369f1e37831273c...|            2|4d0ec1e9b95fb62f9...|81602



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|24e7eb8286ce5ef52...|            1|6fd08d44046ab994b...|d9a84e1403de8da0c...|    2018-02-28 15:31:07| 39.9|         15.1|2018|    2|
|5f827831438fdab57...|            1|3096d513ef378c2f6...|6560211a19b47992c...|    2018-04-26 20:30:46|129.0|         7.77|2018|    4|
|77f0f62ff8a596205...|            1|9361d7d8750b24496...|7d294cf9a6a69dc6a...|    2018-07-20 12:45:23|51.45|        41.34|2018|    7|
|30bf87367ed2e63f6...|            1|ce341595864bee3e3...|1c129092bf23f28a5...|    2018-08-21 14:50:14| 47.9|         9.09|2018|    8|
|54369f1e37831273c...|            2|4d0ec1e9b95fb62f9...|81602

                                                                                

In [24]:
# arrange data using order by

df5.orderBy(col('price').desc()).show(5)

df5.orderBy(col('price').asc(), col('freight_value').desc()).show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|0812eb902a67711a1...|            1|489ae2aa008f02150...|e3b4998c7a498169d...|    2017-02-16 20:37:36|6735.0|       194.31|2017|    2|
|fefacc66af859508b...|            1|69c590f7ffc7bf8db...|80ceebb4ee9b31afb...|    2018-08-02 04:05:13|6729.0|       193.21|2018|    8|
|f5136e38d1a14a4db...|            1|1bdf5e6731585cf01...|ee27a8f15b1dded4d...|    2017-06-15 02:45:17|6499.0|       227.66|2017|    6|
|a96610ab360d42a2e...|            1|a6492cc69376c469a...|59417c56835dd8e2e...|    2017-04-18 13:25:18|4799.0|       151.34|2017|    4|
|199af31afc78c699f...|            1|c3ed642d592594bb6..



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|
|8272b63d03f5f79c5...|           18|270516a3f41dc035a...|2709af9587499e95e...|    2017-07-21 18:25:23|  1.2|         7.89|2017|    7|
|8272b63d03f5f79c5...|            8|05b515fdc76e888aa...|2709a

                                                                                

In [25]:
# Group By Operation

# on single column
df5.groupBy('year').agg(count('*').alias('total_count'), 
                        avg('price').alias('avg_price'), 
                        sum('price').alias('sum_price'),
                        min('price').alias('min_price'),
                        max('price').alias('max_price')).show(5)

# on multiple column
df5.groupBy('year', 'month').agg(count('*').alias('total_count'), 
                        avg('price').alias('avg_price'), 
                        sum('price').alias('sum_price'),
                        min('price').alias('min_price'),
                        max('price').alias('max_price')).orderBy(col('year').asc(),col('month').asc()).show(5)

                                                                                

+----+-----------+------------------+------------------+---------+---------+
|year|total_count|         avg_price|         sum_price|min_price|max_price|
+----+-----------+------------------+------------------+---------+---------+
|2018|      62511|120.08515685239713| 7506643.240000198|     0.85|   6729.0|
|2016|        370| 134.5565405405405|49785.919999999984|      6.0|   1399.0|
|2017|      49765|121.26732804179927| 6034868.580000141|      1.2|   6735.0|
|2020|          4|             86.49|            345.96|    69.99|    99.99|
+----+-----------+------------------+------------------+---------+---------+





+----+-----+-----------+------------------+------------------+---------+---------+
|year|month|total_count|         avg_price|         sum_price|min_price|max_price|
+----+-----+-----------+------------------+------------------+---------+---------+
|2016|    9|          4| 48.61750000000001|194.47000000000003|    44.99|     59.5|
|2016|   10|        365| 135.8371232876712| 49580.54999999999|      6.0|   1399.0|
|2016|   12|          1|              10.9|              10.9|     10.9|     10.9|
|2017|    1|        681|117.65747430249627| 80124.73999999996|      2.9|   1999.0|
|2017|    2|       1866|131.82315648445893| 245982.0100000004|      3.9|   6735.0|
+----+-----+-----------+------------------+------------------+---------+---------+
only showing top 5 rows



                                                                                

In [26]:
# fill missing data with default value

df5.fillna({'price': 0, 'freight_value': 0}).show(5)




+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|3aee81e01f24405a4...|            1|dfec64aac9b864b28...|1e8b33f18b4f7598d...|    2018-02-19 16:27:30| 84.9|         9.41|2018|    2|
|4427836afdab8880f...|            1|011ae9863bd5b15e5...|b94cc9f10ddc85e4b...|    2018-03-15 19:31:40|749.9|        27.83|2018|    3|
|2eabf18a592d25bcc...|            1|e53e557d5a159f5aa...|88460e8ebdecbfecb...|    2017-12-27 12:31:29|109.9|         42.8|2017|   12|
|0f2c54530ee6ee50a...|            1|52e9413ed0d3e64b7...|7f2617c58d5d06806...|    2017-09-14 04:50:22|149.9|        18.33|2017|    9|
|628c4dc07e38c6a43...|            1|9ecadb84c81da840d...|1025f

                                                                                

In [27]:
accum=spark.sparkContext.accumulator(0)

df5.foreach(lambda row: accum.add(row['price']))
print(accum.value) #Accessed by driver



13591643.69999946


                                                                                

In [28]:
# Case-When statement in  

df5.withColumn("price_category",  when(col('price') >= 100, "High")
                                 .when((col('price') < 100) & (col('price') >= 50), "Medium")
                                 .otherwise("Low")).show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|price_category|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|a024d7476a847e045...|            1|fbce4c4cb307679d8...|c33847515fa6305ce...|    2018-08-01 03:24:23|149.0|        51.33|2018|    8|          High|
|6dd5fbff4557f2db4...|            1|c2bd03501cf7f4f3a...|cca3071e3e9bb7d12...|    2017-08-16 23:03:44|51.92|        16.12|2017|    8|        Medium|
|aebe4edc785354fcd...|            1|5a3320037d5922a77...|b6d44737c04332870...|    2017-05-05 13:10:21| 75.0|        35.83|2017|    5|        Medium|
|9d817e85739426de1...|            3|6962734c72522e70e...|b92e3c8f9738272ff...|    2018-06-05 00:15:48|169.

                                                                                

In [29]:
# Window functions in 
from pyspark.sql.window import Window

windowSpec1 = Window.partitionBy('year').orderBy(col('price').asc())
df5.withColumn('dense_rank', dense_rank().over(windowSpec1)).show(5)

windowSpec2 = Window.partitionBy('year').orderBy(col('shipping_limit_datetime').asc())
df5.withColumn('running_sum', sum('price').over(windowSpec2)).show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|dense_rank|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|         1|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|         1|
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|         1|
|f1d5c2e6867fa93ce...|            1|46fce52cef5caa7cc...|2d2322d8421188677...|    2018-08-28 21:30:15|  2.2|         7.39|2018|   

[Stage 129:>                                                        (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|       running_sum|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+------------------+
|b2d1902261f105c5c...|            1|5ed3835ea6f96c77b...|aba1721a889e04dec...|    2018-01-01 22:08:31| 139.0|         8.23|2018|    1|             139.0|
|3c8e80909dd1066fd...|            1|4308439e0d80d5fe0...|59fb871bf6f4522a8...|    2018-01-01 22:13:24|179.99|         27.8|2018|    1|            318.99|
|f2e5bcbd102cd01f1...|            1|2bb3e85f2a403543f...|76d64c4aca3a7baf2...|    2018-01-01 22:27:15| 348.9|       118.06|2018|    1|            667.89|
|2e7080c8c24e4a977...|            1|3bdc89e963c6651b8...|fffd5413c0700ac82..

                                                                                

In [55]:
# Second read example should infer schema, ignore header row

hdfs_path = '/tmp/input_data/sellers_dataset.csv'
sdf = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(hdfs_path)

# Print schema and sample data
sdf.printSchema()
sdf.show(5)

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
+--------------------+----------------------+-----------------+------------+
only showing top 5 rows



In [56]:
# Perform join

result1 = df5.join(broadcast(sdf), df5.seller_id == sdf.seller_id, 'inner').drop(sdf.seller_id)

result1.show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+--------------------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|seller_zip_code_prefix|         seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+--------------------+------------+
|650d7fd7f019e7381...|            1|cc9c93a7dc6ba4b59...|a3a38f4affed601eb...|    2017-08-03 01:45:14| 89.9|        35.95|2017|    8|                 89204|           joinville|          SC|
|3aee81e01f24405a4...|            1|dfec64aac9b864b28...|1e8b33f18b4f7598d...|    2018-02-19 16:27:30| 84.9|         9.41|2018|    2|                  2066|           sao paulo|          SP|
|363524b17966c3a64...|            2|43ee88561

In [41]:
# Perform join with alias names of dataframes

result2 = df5.alias('oid').join(sdf.alias('sid'), col('oid.seller_id') == col('sid.seller_id'), 'inner').drop(col('sid.seller_id'))

result2.show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+--------------------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|seller_zip_code_prefix|         seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+--------------------+------------+
|650d7fd7f019e7381...|            1|cc9c93a7dc6ba4b59...|a3a38f4affed601eb...|    2017-08-03 01:45:14| 89.9|        35.95|2017|    8|                 89204|           joinville|          SC|
|3aee81e01f24405a4...|            1|dfec64aac9b864b28...|1e8b33f18b4f7598d...|    2018-02-19 16:27:30| 84.9|         9.41|2018|    2|                  2066|           sao paulo|          SP|
|363524b17966c3a64...|            2|43ee88561

In [39]:
# work with spark SQL

df5.createOrReplaceTempView("ORDER_ITEM")
sdf.createOrReplaceTempView("SELLERS")


joinDF2 = spark.sql("select * from ORDER_ITEM oid INNER JOIN SELLERS sid ON oid.seller_id == sid.seller_id")

In [42]:
# Write data in HDFS without any partition key

result1.write.format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/result1/')
print("Write Successfull")


                                                                                

In [43]:
# Write data in HDFS with partition key

result1.write.partitionBy('year').format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/result2/')
print("Write Successfull")



Write Successfull


                                                                                

In [44]:
# Write data in HDFS into single file

result1.coalesce(1).write.format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/result3/')
print("Write Successfull")

[Stage 168:>                                                        (0 + 1) / 1]

Write Successfull


                                                                                

In [60]:
# Write data in Hive directly

spark.sql("""set hive.exec.dynamic.partition.mode=nonstrict""")

spark.sql("""USE tables_by_spark""")

# Create a partitioned Hive table
spark.sql("""
    CREATE TABLE IF NOT EXISTS order_sellers_data (
        order_id STRING,
        order_item_id INT,
        product_id STRING,
        price DOUBLE,
        freight_value DOUBLE,
        seller_city STRING
    ) PARTITIONED BY (year INT)
""")

# Write DataFrame to the Hive table
result1.select('order_id',
              'order_item_id',
              'product_id',
              'price',
              'freight_value',
              'seller_city',
              'year').write.mode("append").insertInto("order_sellers_data")

23/07/21 13:01:10 WARN SetCommand: 'SET hive.exec.dynamic.partition.mode=nonstrict' might not work, since Spark doesn't support changing the Hive config dynamically. Please pass the Hive-specific config by adding the prefix spark.hadoop (e.g. spark.hadoop.hive.exec.dynamic.partition.mode) when starting a Spark application. For details, see the link: https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties.
23/07/21 13:01:10 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.
                                                                                