In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime


spark = SparkSession.builder \
    .appName("Spark with Hive") \
    .enableHiveSupport() \
    .getOrCreate()


data = [
    ["Product A", 1001, datetime.strptime("2023-07-20", "%Y-%m-%d"), datetime.strptime("2023-07-20 10:15:30", "%Y-%m-%d %H:%M:%S"), 29.99],
    ["Product B", 1002, datetime.strptime("2023-07-19", "%Y-%m-%d"), datetime.strptime("2023-07-19 14:20:45", "%Y-%m-%d %H:%M:%S"), 49.99],
    ["Product C", 1003, datetime.strptime("2023-07-18", "%Y-%m-%d"), datetime.strptime("2023-07-18 09:30:15", "%Y-%m-%d %H:%M:%S"), 39.99],
    ["Product D", 1004, datetime.strptime("2023-07-17", "%Y-%m-%d"), datetime.strptime("2023-07-17 16:45:00", "%Y-%m-%d %H:%M:%S"), 19.99]
]


schema = StructType([
    StructField("Product", StringType(), True),
    StructField("ID", IntegerType(), True),
    StructField("Date", DateType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("Price", FloatType(), True)
])


df = spark.createDataFrame(data, schema)


df.printSchema()


df.show()

24/08/27 03:15:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- Product: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Price: float (nullable = true)



                                                                                

+---------+----+----------+-------------------+-----+
|  Product|  ID|      Date|          Timestamp|Price|
+---------+----+----------+-------------------+-----+
|Product A|1001|2023-07-20|2023-07-20 10:15:30|29.99|
|Product B|1002|2023-07-19|2023-07-19 14:20:45|49.99|
|Product C|1003|2023-07-18|2023-07-18 09:30:15|39.99|
|Product D|1004|2023-07-17|2023-07-17 16:45:00|19.99|
+---------+----+----------+-------------------+-----+



In [3]:
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_item_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", StringType(), True),
    StructField("shipping_limit_date", TimestampType(), True),
    StructField("price", DoubleType(), True),
    StructField("freight_value", DoubleType(), True)
])

hdfs_path = '/tmp/input_data/order_items_dataset.csv'
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'false').schema(schema).load(hdfs_path)

df.printSchema()

df.show(5)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

                                                                                

In [4]:
hdfs_path = '/tmp/input_data/order_items_dataset.csv'
df2 = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(hdfs_path)


df2.printSchema()
df2.show(5)

                                                                                

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48

In [5]:
print(f'Number of partitions: {df2.rdd.getNumPartitions()}')

df3 = df2.repartition(10)


print(f'Number of partitions: {df3.rdd.getNumPartitions()}')

Number of partitions: 2




Number of partitions: 10


In [6]:
from pyspark.sql.functions import *

df3.select('order_id').show(5)
df3.select('order_id', 'shipping_limit_date').show(5)
df3.select(col('order_id'), col('shipping_limit_date')).show(5)
df3.select(col('order_id').alias('oid'), col('shipping_limit_date').alias('limit_date')).show(5)

                                                                                

+--------------------+
|            order_id|
+--------------------+
|6299bb8e855289b41...|
|71fbb9971d84bf97a...|
|74322a01b770c2ea3...|
|a23fc2b3af4f1a48e...|
|747af114bbea56ac1...|
+--------------------+
only showing top 5 rows



                                                                                

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|3bbf8f927f288e4a1...|2017-11-09 14:25:38|
|50c40cfcbb6ce3fca...|2018-06-14 09:52:04|
|51c3d73e0e9052253...|2018-02-22 19:15:27|
|183ee0e3ebd4c1c99...|2018-02-07 20:14:08|
|3a1400b5d4dd3082a...|2018-03-27 17:28:20|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|f0b47dadd5f372c41...|2018-05-14 04:54:53|
|bbd319ae8e4b46101...|2017-11-24 02:28:04|
|e16fb24453a306d5d...|2018-08-10 03:24:54|
|d4de6d0debe2df72c...|2017-03-13 03:35:12|
|bdbe8da70dcc6e6a2...|2018-04-22 21:52:25|
+--------------------+-------------------+
only showing top 5 rows





+--------------------+-------------------+
|                 oid|         limit_date|
+--------------------+-------------------+
|3bbf8f927f288e4a1...|2017-11-09 14:25:38|
|50c40cfcbb6ce3fca...|2018-06-14 09:52:04|
|51c3d73e0e9052253...|2018-02-22 19:15:27|
|183ee0e3ebd4c1c99...|2018-02-07 20:14:08|
|3a1400b5d4dd3082a...|2018-03-27 17:28:20|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

In [7]:
df4 = df3.withColumn("year", year(col("shipping_limit_date"))).withColumn("month", month(col("shipping_limit_date")))

df4.select("order_id", "shipping_limit_date", "year", "month").show(5)



+--------------------+-------------------+----+-----+
|            order_id|shipping_limit_date|year|month|
+--------------------+-------------------+----+-----+
|3bbf8f927f288e4a1...|2017-11-09 14:25:38|2017|   11|
|50c40cfcbb6ce3fca...|2018-06-14 09:52:04|2018|    6|
|51c3d73e0e9052253...|2018-02-22 19:15:27|2018|    2|
|183ee0e3ebd4c1c99...|2018-02-07 20:14:08|2018|    2|
|3a1400b5d4dd3082a...|2018-03-27 17:28:20|2018|    3|
+--------------------+-------------------+----+-----+
only showing top 5 rows



                                                                                

In [8]:
df5 = df4.withColumnRenamed('shipping_limit_date', 'shipping_limit_datetime')

df5.select("order_id", "shipping_limit_datetime").show(5)



+--------------------+-----------------------+
|            order_id|shipping_limit_datetime|
+--------------------+-----------------------+
|3bbf8f927f288e4a1...|    2017-11-09 14:25:38|
|50c40cfcbb6ce3fca...|    2018-06-14 09:52:04|
|51c3d73e0e9052253...|    2018-02-22 19:15:27|
|183ee0e3ebd4c1c99...|    2018-02-07 20:14:08|
|3a1400b5d4dd3082a...|    2018-03-27 17:28:20|
+--------------------+-----------------------+
only showing top 5 rows



                                                                                

In [9]:
df5.filter(col("order_id") == '00010242fe8c5a6d1ba2dd792cb16214').show(5)

order_li = ['00010242fe8c5a6d1ba2dd792cb16214','00018f77f2f0320c557190d7a144bdd3']
df5.filter(col("order_id").isin(order_li)).show(5)

df5.filter((col("price") < 50) & (col("freight_value")  < 10)).show(5)


df5.filter("price < 50 and freight_value  < 10").show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+



                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13|239.9|        19.93|2017|    5|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+



                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|363524b17966c3a64...|            2|43ee88561093499d9...|23613d49c3ac2bd30...|    2018-05-24 22:35:14| 10.9|          3.8|2018|    5|
|1d9609dad08db33f3...|            1|7cc67695a7648efc5...|95e03ca3d4146e401...|    2017-12-11 18:10:31|29.99|          8.9|2017|   12|
|50aff4b82439e01c5...|            1|ec1faa2edc27ce323...|cc419e0650a3c5ba7...|    2017-11-23 21:53:21|29.99|         7.78|2017|   11|
|37ee401157a3a0b28...|            9|d34c07a2d817ac73f...|e7d5b006eb624f130...|    2018-04-19 02:30:52|29.99|         7.39|2018|    4|
|8f5fac100b291e3c7...|            1|0e996644bf2835621...|b4ffb

In [10]:
df5.show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|    9|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|    6|
|025c72e88fbf2358b...|            2|bef21943bc2335188...|e49c26c3edfa46d22...|    2017-03-21 21:24:27|  19.9|         20.8|2017|    3|
|23d16dddab46fd3d0...|            1|cca8e09ba6f2d35e4...|43f8c9950d11ecd03...|    2018-01-31 22:17:51|109.99|        14.52|2018|    1|
|71c0d1686c9b55563...|            2|eb6c2ecde53034fc9..

                                                                                

In [11]:
df5.drop('month').show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|
|025c72e88fbf2358b...|            2|bef21943bc2335188...|e49c26c3edfa46d22...|    2017-03-21 21:24:27|  19.9|         20.8|2017|
|23d16dddab46fd3d0...|            1|cca8e09ba6f2d35e4...|43f8c9950d11ecd03...|    2018-01-31 22:17:51|109.99|        14.52|2018|
|71c0d1686c9b55563...|            2|eb6c2ecde53034fc9...|1025f0e2d44d7041d...|    2017-12-01 19:3

                                                                                

In [12]:
df5.dropDuplicates(['order_id', 'order_item_id']).show(5)

[Stage 48:>                                                         (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13| 239.9|        19.93|2017|    5|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|    2018-01-18 14:48:30| 199.0|        17.87|2018|    1|
|00048cc3ae777c65d...|            1|ef92defde845ab845...|6426d21aca402a131...|    2017-05-23 03:55:27|  21.9|        12.69|2017|    5|
|0005a1a1728c9d785...|            1|310ae3c140ff94b03...|a416b6a846a117243...|    2018-03-26 18:31:29|145.95|        11.65|2018|    3|
|0005f50442cb953dc...|            1|4535b0e1091c278df..

                                                                                

In [13]:
df5.distinct().show(5)

df5.dropDuplicates().show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|f5c5b9b21c9e6cf7f...|            1|0521fe3eb04940304...|e99e927c81e3f5173...|    2017-11-21 03:46:34|104.53|        17.21|2017|   11|
|6e77d9428b5dec0e2...|            1|d1c427060a0f73f6b...|a1043bafd471dff53...|    2018-04-25 11:15:31| 119.0|        19.74|2018|    4|
|64320e0f5e1ada4c0...|            1|7ce94ab189134e2d3...|8b321bb669392f516...|    2017-12-21 23:14:42| 13.65|         7.78|2017|   12|
|9fe28378e5a0c6b3f...|            2|3938defa878985e56...|412a4720f3e9431b4...|    2018-07-19 04:31:15| 168.0|        60.91|2018|    7|
|5697a446ff63940af...|            1|9c7140bb02241a583..

[Stage 60:>                                                         (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|638913622370c1dfe...|            1|8b50a72d52d7a91fb...|9f505651f4a6abe90...|    2018-08-02 11:45:14|49.99|         7.61|2018|    8|
|845af9472cc76e66f...|            1|781afe929e3016a66...|08633c14ef2db992c...|    2017-12-19 09:30:52|107.9|        15.51|2017|   12|
|13ac325ed34b96835...|            1|62dbbad1385feb9b2...|4d6d651bd7684af3f...|    2018-06-11 08:51:53|250.0|        20.72|2018|    6|
|8e3cbe2ec233a68c1...|            1|c9dbe2eec19a8093c...|cca3071e3e9bb7d12...|    2017-10-19 21:28:18| 96.9|        17.93|2017|   10|
|6649ebec1c6f3e185...|            4|42189544021ccb736...|b18dc

24/08/27 03:53:21 WARN BlockManagerMasterEndpoint: No more replicas available for broadcast_57_piece0 !
24/08/27 03:53:21 WARN BlockManagerMaster: Failed to remove broadcast 57 with removeFromMaster = true - org.apache.spark.SparkException: Could not find BlockManagerEndpoint1.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:178)
	at org.apache.spark.rpc.netty.Dispatcher.postRemoteMessage(Dispatcher.scala:136)
	at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:683)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at io.netty.channel.SimpleChannelInboundHandler.channelRea

In [14]:
df5.orderBy(col('price').desc()).show(5)

df5.orderBy(col('price').asc(), col('freight_value').desc()).show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|0812eb902a67711a1...|            1|489ae2aa008f02150...|e3b4998c7a498169d...|    2017-02-16 20:37:36|6735.0|       194.31|2017|    2|
|fefacc66af859508b...|            1|69c590f7ffc7bf8db...|80ceebb4ee9b31afb...|    2018-08-02 04:05:13|6729.0|       193.21|2018|    8|
|f5136e38d1a14a4db...|            1|1bdf5e6731585cf01...|ee27a8f15b1dded4d...|    2017-06-15 02:45:17|6499.0|       227.66|2017|    6|
|a96610ab360d42a2e...|            1|a6492cc69376c469a...|59417c56835dd8e2e...|    2017-04-18 13:25:18|4799.0|       151.34|2017|    4|
|199af31afc78c699f...|            1|c3ed642d592594bb6..



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|
|8272b63d03f5f79c5...|            1|270516a3f41dc035a...|2709af9587499e95e...|    2017-07-21 18:25:23|  1.2|         7.89|2017|    7|
|8272b63d03f5f79c5...|            4|05b515fdc76e888aa...|2709a

                                                                                

In [15]:

df5.groupBy('year').agg(count('*').alias('total_count'), 
                        avg('price').alias('avg_price'), 
                        sum('price').alias('sum_price'),
                        min('price').alias('min_price'),
                        max('price').alias('max_price')).show(5)


df5.groupBy('year', 'month').agg(count('*').alias('total_count'), 
                        avg('price').alias('avg_price'), 
                        sum('price').alias('sum_price'),
                        min('price').alias('min_price'),
                        max('price').alias('max_price')).orderBy(col('year').asc(),col('month').asc()).show(5)

                                                                                

+----+-----------+------------------+-----------------+---------+---------+
|year|total_count|         avg_price|        sum_price|min_price|max_price|
+----+-----------+------------------+-----------------+---------+---------+
|2018|      62511|120.08515685239729|7506643.240000207|     0.85|   6729.0|
|2017|      49765|121.26732804179925| 6034868.58000014|      1.2|   6735.0|
|2016|        370|134.55654054054054|         49785.92|      6.0|   1399.0|
|2020|          4|             86.49|           345.96|    69.99|    99.99|
+----+-----------+------------------+-----------------+---------+---------+





+----+-----+-----------+------------------+------------------+---------+---------+
|year|month|total_count|         avg_price|         sum_price|min_price|max_price|
+----+-----+-----------+------------------+------------------+---------+---------+
|2016|    9|          4| 48.61750000000001|194.47000000000003|    44.99|     59.5|
|2016|   10|        365|135.83712328767123|49580.549999999996|      6.0|   1399.0|
|2016|   12|          1|              10.9|              10.9|     10.9|     10.9|
|2017|    1|        681|117.65747430249625| 80124.73999999995|      2.9|   1999.0|
|2017|    2|       1866| 131.8231564844589|245982.01000000033|      3.9|   6735.0|
+----+-----+-----------+------------------+------------------+---------+---------+
only showing top 5 rows



                                                                                

In [16]:
df5.fillna({'price': 0, 'freight_value': 0}).show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|    9|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|    6|
|025c72e88fbf2358b...|            2|bef21943bc2335188...|e49c26c3edfa46d22...|    2017-03-21 21:24:27|  19.9|         20.8|2017|    3|
|23d16dddab46fd3d0...|            1|cca8e09ba6f2d35e4...|43f8c9950d11ecd03...|    2018-01-31 22:17:51|109.99|        14.52|2018|    1|
|71c0d1686c9b55563...|            2|eb6c2ecde53034fc9..

                                                                                

In [17]:
accum=spark.sparkContext.accumulator(0)

df5.foreach(lambda row: accum.add(row['price']))
print(accum.value) #Accessed by driver



13591643.699999437


                                                                                

In [18]:
df5.withColumn("price_category",  when(col('price') >= 100, "High")
                                 .when((col('price') < 100) & (col('price') >= 50), "Medium")
                                 .otherwise("Low")).show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|price_category|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|cfe5da82479414589...|            1|6f3b5b605d91b7439...|4869f7a5dfa277a7d...|    2018-08-06 22:15:29|159.9|        13.78|2018|    8|          High|
|af3ad705f2a257c86...|            1|694e3712264584430...|744dac408745240a2...|    2017-11-20 15:11:01|249.0|        26.77|2017|   11|          High|
|c2eaa6cab239c2145...|            1|bbdb487c5f9a780a6...|aaed1309374718fdd...|    2017-04-27 15:15:20| 99.9|        26.96|2017|    4|        Medium|
|e86d50789574a28a9...|            1|00d93a09990b319a7...|391fc6631aebcf300...|    2017-11-30 12:13:41|169.

                                                                                

In [19]:
from pyspark.sql.window import Window

windowSpec1 = Window.partitionBy('year').orderBy(col('price').asc())
df5.withColumn('dense_rank', dense_rank().over(windowSpec1)).show(5)

windowSpec2 = Window.partitionBy('year').orderBy(col('shipping_limit_datetime').asc())
df5.withColumn('running_sum', sum('price').over(windowSpec2)).show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|dense_rank|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|         1|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|         1|
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|         1|
|f1d5c2e6867fa93ce...|            1|46fce52cef5caa7cc...|2d2322d8421188677...|    2018-08-28 21:30:15|  2.2|         7.39|2018|   

[Stage 99:>                                                         (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|       running_sum|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+------------------+
|b2d1902261f105c5c...|            1|5ed3835ea6f96c77b...|aba1721a889e04dec...|    2018-01-01 22:08:31| 139.0|         8.23|2018|    1|             139.0|
|3c8e80909dd1066fd...|            1|4308439e0d80d5fe0...|59fb871bf6f4522a8...|    2018-01-01 22:13:24|179.99|         27.8|2018|    1|            318.99|
|f2e5bcbd102cd01f1...|            1|2bb3e85f2a403543f...|76d64c4aca3a7baf2...|    2018-01-01 22:27:15| 348.9|       118.06|2018|    1|            667.89|
|2e7080c8c24e4a977...|            1|3bdc89e963c6651b8...|fffd5413c0700ac82..

                                                                                

In [20]:
hdfs_path = '/tmp/input_data/sellers_dataset.csv'
sdf = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(hdfs_path)


sdf.printSchema()
sdf.show(5)

                                                                                

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
+--------------------+----------------------+-----------------+------------+
only showing top 5 rows



In [21]:
result1 = df5.join(broadcast(sdf), df5.seller_id == sdf.seller_id, 'inner').drop(sdf.seller_id)

result1.show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+----------------------+-----------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|seller_zip_code_prefix|seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+----------------------+-----------+------------+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|    9|                  4265|  sao paulo|          SP|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|    6|                  1026|  sao paulo|          SP|
|025c72e88fbf2358b...|            2|bef21943bc2335188...|e49c26c3edfa46d22...|    201

In [22]:
result2 = df5.alias('oid').join(sdf.alias('sid'), col('oid.seller_id') == col('sid.seller_id'), 'inner').drop(col('sid.seller_id'))

result2.show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+----------------------+-----------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|seller_zip_code_prefix|seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+----------------------+-----------+------------+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|    9|                  4265|  sao paulo|          SP|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|    6|                  1026|  sao paulo|          SP|
|025c72e88fbf2358b...|            2|bef21943bc2335188...|e49c26c3edfa46d22...|    201

                                                                                

In [24]:
df5.createOrReplaceTempView("ORDER_ITEM")
sdf.createOrReplaceTempView("SELLERS")


joinDF2 = spark.sql("select * from ORDER_ITEM oid INNER JOIN SELLERS sid ON oid.seller_id == sid.seller_id")

joinDF2.show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+--------------------+----------------------+-----------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|           seller_id|seller_zip_code_prefix|seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+--------------------+----------------------+-----------+------------+
|1e1bb536916a99649...|            2|0288f8dd74b931b4e...|1da3aeb70d7989d1e...|    2017-09-05 12:10:11| 49.99|        21.15|2017|    9|1da3aeb70d7989d1e...|                  4265|  sao paulo|          SP|
|62a0e822dd605871a...|            1|31dbb0d1815bdc83c...|6da1992f915d77be9...|    2017-06-08 11:50:18|  29.0|        15.79|2017|    6|6da1992f915d77be9...|                  1026|  sao 

In [25]:
result1.write.format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/result1/')
print("Write Successfull")



Write Successfull


                                                                                

In [26]:
result1.write.partitionBy('year').format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/result2/')
print("Write Successfull")



Write Successfull


                                                                                

In [27]:
result1.coalesce(1).write.format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/result3/')
print("Write Successfull")

[Stage 126:>                                                        (0 + 1) / 1]

Write Successfull


                                                                                

In [28]:
spark.sql("""set hive.exec.dynamic.partition.mode=nonstrict""")

spark.sql("""USE tables_by_spark""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS order_sellers_data (
        order_id STRING,
        order_item_id INT,
        product_id STRING,
        price DOUBLE,
        freight_value DOUBLE,
        seller_city STRING
    ) PARTITIONED BY (year INT)
""")


result1.select('order_id',
              'order_item_id',
              'product_id',
              'price',
              'freight_value',
              'seller_city',
              'year').write.mode("append").insertInto("order_sellers_data")
print("write successfull")

24/08/27 04:42:31 WARN SetCommand: 'SET hive.exec.dynamic.partition.mode=nonstrict' might not work, since Spark doesn't support changing the Hive config dynamically. Please pass the Hive-specific config by adding the prefix spark.hadoop (e.g. spark.hadoop.hive.exec.dynamic.partition.mode) when starting a Spark application. For details, see the link: https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties.
ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
24/08/27 04:42:33 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.
24/08/27 04:42:33 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
                                      

write successfull
