In [3]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [6]:
spark

In [7]:
spark.sparkContext

In [10]:
df = spark.read.parquet('c:/opt/spark-3.1.1-bin-hadoop2.7/examples/src/main/resources/users.parquet')

In [11]:
df

DataFrame[name: string, favorite_color: string, favorite_numbers: array<int>]

In [12]:
spark.read

<pyspark.sql.readwriter.DataFrameReader at 0x23fb1a5bd30>

In [14]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [15]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [18]:
df.write.partitionBy("favorite_color").parquet('c:/opt/spark-3.1.1-bin-hadoop2.7/examples/src/main/resources/users_by_color.parquet')

In [19]:
df.select("name").show()

+------+
|  name|
+------+
|Alyssa|
|   Ben|
+------+



In [20]:
categories = spark.read.parquet("d:/uwm/categories")

In [21]:
categories

DataFrame[category_id: int, category_department_id: int, category_name: string]

In [22]:
categories = spark.read.jdbc("d:/uwm/categories")
departments = spark.read.parquet("d:/uwm/departments")
order_items = spark.read.parquet("d:/uwm/order_items")
orders = spark.read.parquet("d:/uwm/orders")
products = spark.read.parquet("d:/uwm/products")

In [23]:
categories.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- category_department_id: integer (nullable = true)
 |-- category_name: string (nullable = true)



In [24]:
departments.printSchema()

root
 |-- department_id: integer (nullable = true)
 |-- department_name: string (nullable = true)



In [25]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: long (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [26]:
orders.schema

StructType(List(StructField(order_id,IntegerType,true),StructField(order_date,LongType,true),StructField(order_customer_id,IntegerType,true),StructField(order_status,StringType,true)))

In [27]:
categories.show()

+-----------+----------------------+-------------------+
|category_id|category_department_id|      category_name|
+-----------+----------------------+-------------------+
|          1|                     2|           Football|
|          2|                     2|             Soccer|
|          3|                     2|Baseball & Softball|
|          4|                     2|         Basketball|
|          5|                     2|           Lacrosse|
|          6|                     2|   Tennis & Racquet|
|          7|                     2|             Hockey|
|          8|                     2|        More Sports|
|          9|                     3|   Cardio Equipment|
|         10|                     3|  Strength Training|
|         11|                     3|Fitness Accessories|
|         12|                     3|       Boxing & MMA|
|         13|                     3|        Electronics|
|         14|                     3|     Yoga & Pilates|
|         15|                  

In [28]:
categories.toPandas()

Unnamed: 0,category_id,category_department_id,category_name
0,1,2,Football
1,2,2,Soccer
2,3,2,Baseball & Softball
3,4,2,Basketball
4,5,2,Lacrosse
...,...,...,...
111,54,8,MLS
112,55,8,International Soccer
113,56,8,World Cup Shop
114,57,8,MLB Players


In [29]:
categories.count()

116

In [32]:
categories.select("category_name").distinct().count()

55

In [33]:
order_items.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)



In [34]:
order_items.toPandas()

Unnamed: 0,order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price
0,1,1,957,1,299.980011,299.980011
1,2,2,1073,1,199.990005,199.990005
2,3,2,502,5,250.000000,50.000000
3,4,2,403,1,129.990005,129.990005
4,5,4,897,2,49.980000,24.990000
...,...,...,...,...,...,...
344391,172194,68881,403,1,129.990005,129.990005
344392,172195,68882,365,1,59.990002,59.990002
344393,172196,68882,502,1,50.000000,50.000000
344394,172197,68883,208,1,1999.989990,1999.989990


In [35]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: long (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [36]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: long (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [37]:
orders.toPandas()

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,1374735600000,11599,CLOSED
1,2,1374735600000,256,PENDING_PAYMENT
2,3,1374735600000,12111,COMPLETE
3,4,1374735600000,8827,CLOSED
4,5,1374735600000,11318,COMPLETE
...,...,...,...,...
137761,68879,1404889200000,778,COMPLETE
137762,68880,1405234800000,1117,COMPLETE
137763,68881,1405753200000,2518,PENDING_PAYMENT
137764,68882,1406012400000,10000,ON_HOLD


In [38]:
orders.select("order_status").distinct().toPandas()

Unnamed: 0,order_status
0,PENDING_PAYMENT
1,COMPLETE
2,ON_HOLD
3,PAYMENT_REVIEW
4,PROCESSING
5,CLOSED
6,SUSPECTED_FRAUD
7,PENDING
8,CANCELED


In [39]:
orders.select("order_status").distinct().collect()

[Row(order_status='PENDING_PAYMENT'),
 Row(order_status='COMPLETE'),
 Row(order_status='ON_HOLD'),
 Row(order_status='PAYMENT_REVIEW'),
 Row(order_status='PROCESSING'),
 Row(order_status='CLOSED'),
 Row(order_status='SUSPECTED_FRAUD'),
 Row(order_status='PENDING'),
 Row(order_status='CANCELED')]

In [40]:
rows = orders.select("order_status").distinct().collect()

In [41]:
for row in rows:
    print(row)

Row(order_status='PENDING_PAYMENT')
Row(order_status='COMPLETE')
Row(order_status='ON_HOLD')
Row(order_status='PAYMENT_REVIEW')
Row(order_status='PROCESSING')
Row(order_status='CLOSED')
Row(order_status='SUSPECTED_FRAUD')
Row(order_status='PENDING')
Row(order_status='CANCELED')


In [42]:
for row in rows:
    print(row["order_status"])

PENDING_PAYMENT
COMPLETE
ON_HOLD
PAYMENT_REVIEW
PROCESSING
CLOSED
SUSPECTED_FRAUD
PENDING
CANCELED


In [44]:
status_list = sorted([row["order_status"] for row in rows])
status_list

['CANCELED',
 'CLOSED',
 'COMPLETE',
 'ON_HOLD',
 'PAYMENT_REVIEW',
 'PENDING',
 'PENDING_PAYMENT',
 'PROCESSING',
 'SUSPECTED_FRAUD']

In [45]:
products.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_category_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_price: float (nullable = true)
 |-- product_image: string (nullable = true)



In [46]:
products.toPandas()

Unnamed: 0,product_id,product_category_id,product_name,product_description,product_price,product_image
0,1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.980000,http://images.acmesports.sports/Quest+Q64+10+F...
1,2,2,Under Armour Men's Highlight MC Football Clea,,129.990005,http://images.acmesports.sports/Under+Armour+M...
2,3,2,Under Armour Men's Renegade D Mid Football Cl,,89.989998,http://images.acmesports.sports/Under+Armour+M...
3,4,2,Under Armour Men's Renegade D Mid Football Cl,,89.989998,http://images.acmesports.sports/Under+Armour+M...
4,5,2,Riddell Youth Revolution Speed Custom Footbal,,199.990005,http://images.acmesports.sports/Riddell+Youth+...
...,...,...,...,...,...,...
2685,1341,59,Nike Women's Cleveland Browns Johnny Football,,34.000000,http://images.acmesports.sports/Nike+Women%27s...
2686,1342,59,Nike Men's St. Louis Rams Michael Sam #96 Nam,,32.000000,http://images.acmesports.sports/Nike+Men%27s+S...
2687,1343,59,Nike Men's Home Game Jersey St. Louis Rams Mi,,100.000000,http://images.acmesports.sports/Nike+Men%27s+H...
2688,1344,59,Nike Men's Home Game Jersey St. Louis Rams Aa,,100.000000,http://images.acmesports.sports/Nike+Men%27s+H...


In [47]:
categories.cache()
departments.cache()
order_items.cache()
orders.cache()
products.cache()

DataFrame[product_id: int, product_category_id: int, product_name: string, product_description: string, product_price: float, product_image: string]

In [48]:
categories.show()

+-----------+----------------------+-------------------+
|category_id|category_department_id|      category_name|
+-----------+----------------------+-------------------+
|          1|                     2|           Football|
|          2|                     2|             Soccer|
|          3|                     2|Baseball & Softball|
|          4|                     2|         Basketball|
|          5|                     2|           Lacrosse|
|          6|                     2|   Tennis & Racquet|
|          7|                     2|             Hockey|
|          8|                     2|        More Sports|
|          9|                     3|   Cardio Equipment|
|         10|                     3|  Strength Training|
|         11|                     3|Fitness Accessories|
|         12|                     3|       Boxing & MMA|
|         13|                     3|        Electronics|
|         14|                     3|     Yoga & Pilates|
|         15|                  

In [51]:
categories.select("category_name").distinct().count()

55

In [50]:
categories.unpersist()

DataFrame[category_id: int, category_department_id: int, category_name: string]

In [52]:
categories.select(categories.category_name).distinct().count()

55

In [53]:
categories.category_name

Column<'category_name'>

In [56]:
cats_with_dep_name = categories.join(departments)
cats_with_dep_name

DataFrame[category_id: int, category_department_id: int, category_name: string, department_id: int, department_name: string]

In [57]:
categories.count()

116

In [58]:
departments.count()

12

In [59]:
cats_with_dep_name.count()

1392

In [64]:
116*12

1392

In [65]:
cats_with_dep_name.toPandas()

Unnamed: 0,category_id,category_department_id,category_name,department_id,department_name
0,1,2,Football,2,Fitness
1,1,2,Football,3,Footwear
2,1,2,Football,4,Apparel
3,1,2,Football,5,Golf
4,1,2,Football,6,Outdoors
...,...,...,...,...,...
1387,58,8,NFL Players,3,Footwear
1388,58,8,NFL Players,4,Apparel
1389,58,8,NFL Players,5,Golf
1390,58,8,NFL Players,6,Outdoors


In [62]:
cats = categories.join(departments, categories.category_department_id == departments.department_id)
cats

DataFrame[category_id: int, category_department_id: int, category_name: string, department_id: int, department_name: string]

In [63]:
cats.toPandas()

Unnamed: 0,category_id,category_department_id,category_name,department_id,department_name
0,1,2,Football,2,Fitness
1,1,2,Football,2,Fitness
2,2,2,Soccer,2,Fitness
3,2,2,Soccer,2,Fitness
4,3,2,Baseball & Softball,2,Fitness
...,...,...,...,...,...
187,46,7,Indoor/Outdoor Games,7,Fan Shop
188,47,7,Boating,7,Fan Shop
189,47,7,Boating,7,Fan Shop
190,48,7,Water Sports,7,Fan Shop


In [66]:
cats.columns

['category_id',
 'category_department_id',
 'category_name',
 'department_id',
 'department_name']

In [67]:
cols = ['category_id',
 'category_name',
 'department_id',
 'department_name']

In [68]:
cats.select(cols).toPandas()

Unnamed: 0,category_id,category_name,department_id,department_name
0,1,Football,2,Fitness
1,1,Football,2,Fitness
2,2,Soccer,2,Fitness
3,2,Soccer,2,Fitness
4,3,Baseball & Softball,2,Fitness
...,...,...,...,...
187,46,Indoor/Outdoor Games,7,Fan Shop
188,47,Boating,7,Fan Shop
189,47,Boating,7,Fan Shop
190,48,Water Sports,7,Fan Shop


In [69]:
cats.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- category_department_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- department_name: string (nullable = true)



In [72]:
cats.drop("category_department_id")

DataFrame[category_id: int, category_name: string, department_id: int, department_name: string]

In [73]:
cats.take(5)

[Row(category_id=1, category_department_id=2, category_name='Football', department_id=2, department_name='Fitness'),
 Row(category_id=1, category_department_id=2, category_name='Football', department_id=2, department_name='Fitness'),
 Row(category_id=2, category_department_id=2, category_name='Soccer', department_id=2, department_name='Fitness'),
 Row(category_id=2, category_department_id=2, category_name='Soccer', department_id=2, department_name='Fitness'),
 Row(category_id=3, category_department_id=2, category_name='Baseball & Softball', department_id=2, department_name='Fitness')]

In [76]:
join_types = ["inner", "cross", "outer", "full", "left_outer", "right_outer"]

In [77]:
for join_type in join_types:
    cats_on_deps = categories.join(departments, categories.category_department_id == departments.department_id, how=join_type)
    count= cats_on_deps.count()
    print(join_type, ": ", count)

inner :  192
cross :  192
outer :  212
full :  212
left_outer :  212
right_outer :  192


In [78]:
categories.sort(categories.category_name).toPandas()

Unnamed: 0,category_id,category_department_id,category_name
0,22,4,Accessories
1,27,5,Accessories
2,40,6,Accessories
3,22,4,Accessories
4,27,5,Accessories
...,...,...,...
111,31,6,Women's Golf Clubs
112,56,8,World Cup Shop
113,56,8,World Cup Shop
114,14,3,Yoga & Pilates


In [79]:
import pyspark.sql.functions as F

In [80]:
categories.sort(F.desc(categories.category_name)).toPandas()

Unnamed: 0,category_id,category_department_id,category_name
0,14,3,Yoga & Pilates
1,14,3,Yoga & Pilates
2,56,8,World Cup Shop
3,56,8,World Cup Shop
4,31,6,Women's Golf Clubs
...,...,...,...
111,27,5,Accessories
112,40,6,Accessories
113,22,4,Accessories
114,27,5,Accessories


## najbardziej popularne kategorie produktów

```
select c.category_name, count(order_item_quantity) as count
from order_items oi
inner join products p on oi.order_item_product_id = p.product_id
inner join categories c on c.category_id = p.product_category_id
group by c.category_name
order by count desc
limit 10;
```

In [81]:
join1 = order_items.join(products, order_items.order_item_product_id == products.product_id)

In [82]:
join1

DataFrame[order_item_id: int, order_item_order_id: int, order_item_product_id: int, order_item_quantity: int, order_item_subtotal: float, order_item_product_price: float, product_id: int, product_category_id: int, product_name: string, product_description: string, product_price: float, product_image: string]

In [83]:
join1.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_category_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_price: float (nullable = true)
 |-- product_image: string (nullable = true)



In [84]:
join2 = join1.join(categories, categories.category_id == join1.product_category_id)
join2.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_category_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_price: float (nullable = true)
 |-- product_image: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- category_department_id: integer (nullable = true)
 |-- category_name: string (nullable = true)



In [85]:
join2.toPandas()

Unnamed: 0,order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price,product_id,product_category_id,product_name,product_description,product_price,product_image,category_id,category_department_id,category_name
0,1,1,957,1,299.980011,299.980011,957,43,Diamondback Women's Serene Classic Comfort Bi,,299.980011,http://images.acmesports.sports/Diamondback+Wo...,43,7,Camping & Hiking
1,1,1,957,1,299.980011,299.980011,957,43,Diamondback Women's Serene Classic Comfort Bi,,299.980011,http://images.acmesports.sports/Diamondback+Wo...,43,7,Camping & Hiking
2,1,1,957,1,299.980011,299.980011,957,43,Diamondback Women's Serene Classic Comfort Bi,,299.980011,http://images.acmesports.sports/Diamondback+Wo...,43,7,Camping & Hiking
3,1,1,957,1,299.980011,299.980011,957,43,Diamondback Women's Serene Classic Comfort Bi,,299.980011,http://images.acmesports.sports/Diamondback+Wo...,43,7,Camping & Hiking
4,2,2,1073,1,199.990005,199.990005,1073,48,Pelican Sunstream 100 Kayak,,199.990005,http://images.acmesports.sports/Pelican+Sunstr...,48,7,Water Sports
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1377579,172197,68883,208,1,1999.989990,1999.989990,208,10,SOLE E35 Elliptical,,1999.989990,http://images.acmesports.sports/SOLE+E35+Ellip...,10,3,Strength Training
1377580,172198,68883,502,3,150.000000,50.000000,502,24,Nike Men's Dri-FIT Victory Golf Polo,,50.000000,http://images.acmesports.sports/Nike+Men%27s+D...,24,5,Women's Apparel
1377581,172198,68883,502,3,150.000000,50.000000,502,24,Nike Men's Dri-FIT Victory Golf Polo,,50.000000,http://images.acmesports.sports/Nike+Men%27s+D...,24,5,Women's Apparel
1377582,172198,68883,502,3,150.000000,50.000000,502,24,Nike Men's Dri-FIT Victory Golf Polo,,50.000000,http://images.acmesports.sports/Nike+Men%27s+D...,24,5,Women's Apparel


In [87]:
category_count = join2.groupBy(join2.category_name).count()
category_count

DataFrame[category_name: string, count: bigint]

In [88]:
category_count.toPandas()

Unnamed: 0,category_name,count
0,Men's Golf Clubs,2264
1,Camping & Hiking,109832
2,Fitness Accessories,2472
3,Golf Shoes,4192
4,Basketball,536
5,Men's Footwear,177968
6,Electronics,25248
7,Women's Apparel,168280
8,Girls' Apparel,9608
9,Golf Bags & Carts,488


In [90]:
category_count.describe().toPandas()

Unnamed: 0,summary,category_name,count
0,count,32,32.0
1,mean,,43049.5
2,stddev,,64476.93380807064
3,min,Accessories,488.0
4,max,Women's Golf Clubs,196408.0


In [91]:
category_count.sort(F.desc("count")).toPandas()

Unnamed: 0,category_name,count
0,Cleats,196408
1,Men's Footwear,177968
2,Women's Apparel,168280
3,Indoor/Outdoor Games,154384
4,Fishing,138600
5,Water Sports,124320
6,Camping & Hiking,109832
7,Cardio Equipment,99896
8,Shop By Sport,87872
9,Electronics,25248


In [93]:
categories = spark.read.parquet("d:/uwm/categories")
departments = spark.read.parquet("d:/uwm/departments")
order_items = spark.read.parquet("d:/uwm/order_items")
orders = spark.read.parquet("d:/uwm/orders")
products = spark.read.parquet("d:/uwm/products")

categories.cache()
departments.cache()
order_items.cache()
orders.cache()
products.cache()

DataFrame[product_id: int, product_category_id: int, product_name: string, product_description: string, product_price: float, product_image: string]

In [108]:
join2 = order_items \
    .join(products, order_items.order_item_product_id == products.product_id) \
    .join(categories, categories.category_id == products.product_category_id)

category_count = join2.groupBy(join2.category_name).count().sort(F.desc("count"))

In [110]:
category_count.toPandas()

Unnamed: 0,category_name,count
0,Cleats,196408
1,Men's Footwear,177968
2,Women's Apparel,168280
3,Indoor/Outdoor Games,154384
4,Fishing,138600
5,Water Sports,124320
6,Camping & Hiking,109832
7,Cardio Equipment,99896
8,Shop By Sport,87872
9,Electronics,25248


In [106]:
category_count = order_items \
    .join(products, order_items.order_item_product_id == products.product_id) \
    .join(categories, categories.category_id == products.product_category_id) \
    .groupBy(categories.category_name).count().sort(F.desc("count"))

In [107]:
category_count.toPandas()

Unnamed: 0,category_name,count
0,Cleats,196408
1,Men's Footwear,177968
2,Women's Apparel,168280
3,Indoor/Outdoor Games,154384
4,Fishing,138600
5,Water Sports,124320
6,Camping & Hiking,109832
7,Cardio Equipment,99896
8,Shop By Sport,87872
9,Electronics,25248


In [101]:
categories.createOrReplaceTempView("categories")
departments.createOrReplaceTempView("departments")
order_items.createOrReplaceTempView("order_items")
products.createOrReplaceTempView("products")

In [102]:
wynik = spark.sql("""

select c.category_name, count(order_item_quantity) as count
from order_items oi
inner join products p on oi.order_item_product_id = p.product_id
inner join categories c on c.category_id = p.product_category_id
group by c.category_name
order by count desc

""")

In [103]:
wynik

DataFrame[category_name: string, count: bigint]

In [104]:
wynik.toPandas()

Unnamed: 0,category_name,count
0,Cleats,196408
1,Men's Footwear,177968
2,Women's Apparel,168280
3,Indoor/Outdoor Games,154384
4,Fishing,138600
5,Water Sports,124320
6,Camping & Hiking,109832
7,Cardio Equipment,99896
8,Shop By Sport,87872
9,Electronics,25248
