In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

In [5]:
spark=SparkSession.builder \
    .appName("Spark") \
    .enableHiveSupport() \
    .getOrCreate()

In [6]:
spark

In [8]:
schema=StructType([
    StructField("order_id ",StringType(),True),
    StructField("order_item_id ",IntegerType(),True),
    StructField("product_id ",StringType(),True),
    StructField("seller_id",StringType(),True),
    StructField("shipping_limit_date",TimestampType(),True),
    StructField("price",DoubleType(),True),
    StructField("freight_value",DoubleType(),True),
])

path="data/order_items_dataset.csv"
df=spark.read.format('csv').option('header','true').option('inferSchema','false').schema(schema).load(path)

df.printSchema()
df.show(5)


root
 |-- order_id : string (nullable = true)
 |-- order_item_id : integer (nullable = true)
 |-- product_id : string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01

In [7]:
df.rdd.getNumPartitions()

4

In [8]:
df_repartitions=df.repartition(10)

In [10]:
df_repartitions.rdd.getNumPartitions()

10

In [12]:
from pyspark.sql.functions import *

In [9]:
df.select('order_id ').show(5,truncate=False)

+--------------------------------+
|order_id                        |
+--------------------------------+
|00010242fe8c5a6d1ba2dd792cb16214|
|00018f77f2f0320c557190d7a144bdd3|
|000229ec398224ef6ca0657da4fc703e|
|00024acbcdf0a6daa1e931b038114c75|
|00042b26cf59d7ce69dfabb4e55b4fd9|
+--------------------------------+
only showing top 5 rows



In [13]:
df2=df.withColumn("year",year(col("shipping_limit_date"))).withColumn("month",month(col("shipping_limit_date")))
df2.show(5,truncate=False)

+--------------------------------+--------------+--------------------------------+--------------------------------+-------------------+-----+-------------+----+-----+
|order_id                        |order_item_id |product_id                      |seller_id                       |shipping_limit_date|price|freight_value|year|month|
+--------------------------------+--------------+--------------------------------+--------------------------------+-------------------+-----+-------------+----+-----+
|00010242fe8c5a6d1ba2dd792cb16214|1             |4244733e06e7ecb4970a6e2683c13e61|48436dade18ac8b2bce089ec2a041202|2017-09-19 09:45:35|58.9 |13.29        |2017|9    |
|00018f77f2f0320c557190d7a144bdd3|1             |e5f2d52b802189ee658865ca93d83a8f|dd7ddc04e1b6c2c614352b383efe2d36|2017-05-03 11:05:13|239.9|19.93        |2017|5    |
|000229ec398224ef6ca0657da4fc703e|1             |c777355d18b72b67abbeef9df44fd0fd|5b51032eddd242adc84c38acab88f23d|2018-01-18 14:48:30|199.0|17.87        |2018|1    

In [20]:
order_li=["00010242fe8c5a6d1ba2dd792cb16214","00018f77f2f0320c557190d7a144bdd3"]

df2.filter(col("order_id ").isin(order_li)).show()

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|year|month|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|2017|    9|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|2017|    5|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+



In [21]:
df2.drop('month').show(5)

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|year|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|2017|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|2017|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|2018|
|00024acbcdf0a6daa...|             1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|2018|
|00042b26cf59d7ce6...|             1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|2017|


In [23]:
df2.dropDuplicates(['order_id ','order_item_id ']).show(10)

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|year|month|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|2017|    5|
|0015ebb40fb17286b...|             1|50fd2b788dc166edd...|8b321bb669392f516...|2018-01-18 09:11:24| 21.9|         15.1|2018|    1|
|001dbc16dc51075e9...|             1|777d2e438a1b645f3...|4a3ca9315b744ce9f...|2017-02-01 13:17:57| 69.9|         18.0|2017|    2|
|0028de0ca693a1bb2...|             1|059344baebbeaa42f...|955fee9216a65b617...|2018-08-21 03:35:17|29.99|        15.31|2018|    8|
|002c9def9c9b951b1...|             1|2d9ff06c8870a518f...|00720abe85ba08598...|2018

In [24]:
df2.orderBy(col('price').asc(),col('freight_value').desc()).show(10)

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|year|month|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+
|c5bdd8ef3c0ec4202...|             2|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-07 02:55:22| 0.85|         22.3|2018|    5|
|6e864b3f0ec710311...|             1|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-02 20:30:34| 0.85|        18.23|2018|    5|
|3ee6513ae7ea23bdf...|             1|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-04 03:55:26| 0.85|        18.23|2018|    5|
|8272b63d03f5f79c5...|            19|270516a3f41dc035a...|2709af9587499e95e...|2017-07-21 18:25:23|  1.2|         7.89|2017|    7|
|8272b63d03f5f79c5...|            10|05b515fdc76e888aa...|2709af9587499e95e...|2017

In [29]:
df2.groupBy('year','month').agg(count('*'),avg('price'),min('price'),max('price')).orderBy(col('year').asc(),col('month').desc()).show()

+----+-----+--------+------------------+----------+----------+
|year|month|count(1)|        avg(price)|min(price)|max(price)|
+----+-----+--------+------------------+----------+----------+
|2016|   12|       1|              10.9|      10.9|      10.9|
|2016|   10|     365|135.83712328767106|       6.0|    1399.0|
|2016|    9|       4|           48.6175|     44.99|      59.5|
|2017|   12|    7726|116.35011390111136|       4.4|    3124.0|
|2017|   11|    7355|120.10219306593969|      3.85|    2990.0|
|2017|   10|    5189|126.81060512622734|       4.5|   2999.99|
|2017|    9|    4724|130.61941574936384|      2.29|    1798.0|
|2017|    8|    5042|111.08554938516372|       3.9|    2649.0|
|2017|    7|    4116|113.04229834791019|       1.2|   2999.89|
|2017|    6|    3801|123.38885819521138|      3.49|    6499.0|
|2017|    5|    4150|121.84468915662596|       3.5|    4690.0|
|2017|    4|    2364|130.35038917089682|       4.9|    4799.0|
|2017|    3|    2751|124.77011995637947|       4.9|    

In [30]:

# Working with different partitions
accum=spark.sparkContext.accumulator(0)
df2.foreach(lambda row:accum.add(row['price']))

print(accum.value)

13591643.699999392


In [15]:
df2.withColumn("price_category", when(col('price') >= 100 , "High")
               .when((col('price') < 100) & (col('price')>=50),"Medium")
               .otherwise("Low")).show(5)

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+--------------+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|year|month|price_category|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+--------------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|2017|    9|        Medium|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|2017|    5|          High|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|2018|    1|          High|
|00024acbcdf0a6daa...|             1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|2018|

In [16]:
from pyspark.sql.window import Window

In [17]:
windowspec=Window.partitionBy('year').orderBy(col('price').asc())

In [28]:
df3=df2.withColumn("dense_rank",dense_rank().over(windowspec))

In [34]:
df3.show()

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+----------+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|year|month|dense_rank|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+----------+
|3ee6513ae7ea23bdf...|             1|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-04 03:55:26| 0.85|        18.23|2018|    5|         1|
|6e864b3f0ec710311...|             1|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-02 20:30:34| 0.85|        18.23|2018|    5|         1|
|c5bdd8ef3c0ec4202...|             2|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-07 02:55:22| 0.85|         22.3|2018|    5|         1|
|f1d5c2e6867fa93ce...|             1|46fce52cef5caa7cc...|2d2322d8421188677...|2018-08-28 21:30:15|  2.2|         7.39|2018|    8|         2|
|de03f

In [39]:
df3.select("year").distinct().show()

+----+
|year|
+----+
|2018|
|2020|
|2016|
|2017|
+----+



In [41]:
df3.filter(df3['year']==2020).show(10)

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+----------+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|year|month|dense_rank|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+----------+
|13bdf405f961a6dee...|             1|96ea060e41bdecc64...|7a241947449cc45db...|2020-02-05 03:30:51|69.99|        14.66|2020|    2|         1|
|9c94a4ea2f7876660...|             1|282b126b2354516c5...|7a241947449cc45db...|2020-02-03 20:23:22|75.99|         14.7|2020|    2|         2|
|c2bb89b5c1dd978d5...|             1|87b92e06b320e803d...|7a241947449cc45db...|2020-04-09 22:35:08|99.99|        61.44|2020|    4|         3|
|c2bb89b5c1dd978d5...|             2|87b92e06b320e803d...|7a241947449cc45db...|2020-04-09 22:35:08|99.99|        61.44|2020|    4|         3|
+-----

In [42]:
df4=df2.withColumn("dense_rank",rank().over(windowspec))

In [43]:
df4.show(20)

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+----------+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|year|month|dense_rank|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+----------+
|3ee6513ae7ea23bdf...|             1|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-04 03:55:26| 0.85|        18.23|2018|    5|         1|
|6e864b3f0ec710311...|             1|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-02 20:30:34| 0.85|        18.23|2018|    5|         1|
|c5bdd8ef3c0ec4202...|             2|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-07 02:55:22| 0.85|         22.3|2018|    5|         1|
|f1d5c2e6867fa93ce...|             1|46fce52cef5caa7cc...|2d2322d8421188677...|2018-08-28 21:30:15|  2.2|         7.39|2018|    8|         4|
|de03f

In [44]:
df5=df2.withColumn("dense_rank",row_number().over(windowspec))

In [45]:
df5.show()

+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+----------+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date|price|freight_value|year|month|dense_rank|
+--------------------+--------------+--------------------+--------------------+-------------------+-----+-------------+----+-----+----------+
|3ee6513ae7ea23bdf...|             1|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-04 03:55:26| 0.85|        18.23|2018|    5|         1|
|6e864b3f0ec710311...|             1|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-02 20:30:34| 0.85|        18.23|2018|    5|         2|
|c5bdd8ef3c0ec4202...|             2|8a3254bee785a526d...|96804ea39d96eb908...|2018-05-07 02:55:22| 0.85|         22.3|2018|    5|         3|
|f1d5c2e6867fa93ce...|             1|46fce52cef5caa7cc...|2d2322d8421188677...|2018-08-28 21:30:15|  2.2|         7.39|2018|    8|         4|
|de03f

In [48]:
windowSpec2=Window.partitionBy('year').orderBy(col('shipping_limit_date').asc())
df2.withColumn('running_sum',sum('price').over(windowSpec2)).show()

+--------------------+--------------+--------------------+--------------------+-------------------+------+-------------+----+-----+------------------+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date| price|freight_value|year|month|       running_sum|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-------------+----+-----+------------------+
|b2d1902261f105c5c...|             1|5ed3835ea6f96c77b...|aba1721a889e04dec...|2018-01-01 22:08:31| 139.0|         8.23|2018|    1|             139.0|
|3c8e80909dd1066fd...|             1|4308439e0d80d5fe0...|59fb871bf6f4522a8...|2018-01-01 22:13:24|179.99|         27.8|2018|    1|            318.99|
|f2e5bcbd102cd01f1...|             1|2bb3e85f2a403543f...|76d64c4aca3a7baf2...|2018-01-01 22:27:15| 348.9|       118.06|2018|    1|            667.89|
|2e7080c8c24e4a977...|             1|3bdc89e963c6651b8...|fffd5413c0700ac82...|2018-01-01 22:3

In [49]:
seller_df=spark.read.format('csv').option('header','true').option('inferSchema','true').load('data/sellers_dataset.csv')

In [50]:
seller_df.show(5)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
+--------------------+----------------------+-----------------+------------+
only showing top 5 rows



In [56]:
order_seller_df=df2.join(seller_df,on="seller_id")

In [57]:
order_seller_df.show(10)

+--------------------+--------------------+--------------+--------------------+-------------------+------+-------------+----+-----+----------------------+-------------------+------------+
|           seller_id|           order_id |order_item_id |         product_id |shipping_limit_date| price|freight_value|year|month|seller_zip_code_prefix|        seller_city|seller_state|
+--------------------+--------------------+--------------+--------------------+-------------------+------+-------------+----+-----+----------------------+-------------------+------------+
|48436dade18ac8b2b...|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|2017-09-19 09:45:35|  58.9|        13.29|2017|    9|                 27277|      volta redonda|          SP|
|dd7ddc04e1b6c2c61...|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|2017-05-03 11:05:13| 239.9|        19.93|2017|    5|                  3471|          sao paulo|          SP|
|5b51032eddd242adc...|000229ec398224ef6...|             1|c7

In [68]:
order_seller_df.groupBy('seller_id','seller_state').agg(count('*').alias("total_sales")).orderBy(col("total_sales").desc()).show()

+--------------------+------------+-----------+
|           seller_id|seller_state|total_sales|
+--------------------+------------+-----------+
|6560211a19b47992c...|          SP|       2033|
|4a3ca9315b744ce9f...|          SP|       1987|
|1f50f920176fa81da...|          SP|       1931|
|cc419e0650a3c5ba7...|          SP|       1775|
|da8622b14eb17ae28...|          SP|       1551|
|955fee9216a65b617...|          SP|       1499|
|1025f0e2d44d7041d...|          SP|       1428|
|7c67e1448b00f6e96...|          SP|       1364|
|ea8482cd71df3c196...|          SP|       1203|
|7a67c85e85bb2ce85...|          SP|       1171|
|4869f7a5dfa277a7d...|          SP|       1156|
|3d871de0142ce09b7...|          SP|       1147|
|8b321bb669392f516...|          SP|       1018|
|cca3071e3e9bb7d12...|          SP|        830|
|620c87c171fb2a6dd...|          RJ|        798|
|a1043bafd471dff53...|          MG|        770|
|e9779976487b77c6d...|          SP|        750|
|f8db351d8c4c4c22c...|          SP|     

## Broadcast Join

In [70]:
result= df2.join(broadcast(seller_df),df2.seller_id==seller_df.seller_id,'inner').drop(seller_df.seller_id)

In [71]:
result.show(10)

+--------------------+--------------+--------------------+--------------------+-------------------+------+-------------+----+-----+----------------------+-------------------+------------+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date| price|freight_value|year|month|seller_zip_code_prefix|        seller_city|seller_state|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-------------+----+-----+----------------------+-------------------+------------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|2017|    9|                 27277|      volta redonda|          SP|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|2017|    5|                  3471|          sao paulo|          SP|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b

### SQL

In [74]:
df2.createOrReplaceTempView("order")
seller_df.createOrReplaceTempView("seller")



In [75]:
join_df=spark.sql("select * from order inner join seller on order.seller_id == seller.seller_id")

In [77]:
join_df.show()

+--------------------+--------------+--------------------+--------------------+-------------------+------+-------------+----+-----+--------------------+----------------------+--------------------+------------+
|           order_id |order_item_id |         product_id |           seller_id|shipping_limit_date| price|freight_value|year|month|           seller_id|seller_zip_code_prefix|         seller_city|seller_state|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-------------+----+-----+--------------------+----------------------+--------------------+------------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|2017|    9|48436dade18ac8b2b...|                 27277|       volta redonda|          SP|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|2017|    5|dd7ddc04e1b6c2c61...|        

In [80]:
order_seller_df.write.format("csv").option('header','true').option('delimiter',',').save('/home/jovyan/work/output/join_csv')
print("Write successully")

Write successully


#### Hive

In [86]:
spark.sql(""" set hive.exec.dynamic.partition.mode=nonstrict""")
spark.sql("CREATE DATABASE IF NOT EXISTS tables_by_spark")
spark.sql("USE tables_by_spark")
spark.sql("""
        CREATE TABLE IF NOT EXISTS order_seller_data(
            order_id STRING,
            product_id STRING,
            price DOUBLE,
            freight_value DOUBLE,
            seller_city STRING
        ) PARTITIONED BY (year INT)
""")

join_df.select('order_id ','product_id ','price','freight_value','seller_city','year').write.mode("append").insertInto("order_seller_data")