In [4]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DS").getOrCreate()
spark

In [6]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [8]:
df.groupBy("color").count().show()

+-----+-----+
|color|count|
+-----+-----+
|  red|    5|
|black|    1|
| blue|    2|
+-----+-----+



In [9]:
df.groupBy("color").avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
|black|    6.0|   60.0|
| blue|    3.0|   30.0|
+-----+-------+-------+



In [11]:
avg = df.groupBy("color").avg()
avg.cache()

DataFrame[color: string, avg(v1): double, avg(v2): double]

In [12]:
avg1 = avg.select(avg.color, avg["avg(v1)"].alias("avg_v1"))

In [13]:
avg1.show()

+-----+------+
|color|avg_v1|
+-----+------+
|  red|   4.8|
|black|   6.0|
| blue|   3.0|
+-----+------+



In [14]:
import pyspark.sql.functions as F

In [15]:
avg2 = avg.select(F.monotonically_increasing_id().alias("id"), avg.color, avg["avg(v2)"].alias("avg_v2"))

In [16]:
avg2.toPandas()

Unnamed: 0,id,color,avg_v2
0,781684047872,red,48.0
1,1279900254208,black,60.0
2,1443109011456,blue,30.0


In [18]:
!pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-3.0.0-cp38-cp38-win_amd64.whl (12.7 MB)
Installing collected packages: pyarrow
Successfully installed pyarrow-3.0.0


In [19]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
+-----+------+---+---+



In [20]:
dfp = df.groupby('color').applyInPandas(plus_mean, schema=df.schema)

In [25]:
dfp.describe().show()

+-------+-----+------+------------------+-----------------+
|summary|color| fruit|                v1|               v2|
+-------+-----+------+------------------+-----------------+
|  count|    8|     8|                 8|                8|
|   mean| null|  null|             0.125|             45.0|
| stddev| null|  null|1.8850918886280925|24.49489742783178|
|    min|black|banana|                -3|               10|
|    max|  red| grape|                 3|               80|
+-------+-----+------+------------------+-----------------+



In [30]:
dfp.explain()

== Physical Plan ==
FlatMapGroupsInPandas [color#0], plus_mean(color#0, fruit#1, v1#2L, v2#3L), [color#311, fruit#312, v1#313L, v2#314L]
+- *(2) Sort [color#0 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(color#0, 200), ENSURE_REQUIREMENTS, [id=#424]
      +- *(1) Project [color#0, color#0, fruit#1, v1#2L, v2#3L]
         +- *(1) Scan ExistingRDD[color#0,fruit#1,v1#2L,v2#3L]




In [33]:
!dir d:\uwm\

 Volume in drive D is Universum
 Volume Serial Number is 5FC7-5225

 Directory of d:\uwm

20.03.2021  03:43    <DIR>          .
20.03.2021  03:43    <DIR>          ..
21.03.2021  11:24    <DIR>          customers
21.03.2021  11:24    <DIR>          departments
21.03.2021  11:24    <DIR>          order_items
21.03.2021  11:24    <DIR>          orders
21.03.2021  11:24    <DIR>          products
21.03.2021  11:26    <DIR>          categories
27.03.2021  05:11        39˙593˙868 access.log.2
27.03.2021  06:21    <DIR>          cloudera-quickstart
27.03.2021  06:22    <DIR>          examples
27.03.2021  06:29    <DIR>          gen_logs
               1 File(s)     39˙593˙868 bytes
              11 Dir(s)  1˙414˙731˙530˙240 bytes free


In [39]:
categories = spark.read.parquet("d:/uwm/categories")
departments = spark.read.parquet("d:/uwm/departments")
order_items = spark.read.parquet("d:/uwm/order_items")
orders = spark.read.parquet("d:/uwm/orders")
products = spark.read.parquet("d:/uwm/products")

In [36]:
categories.show()

+-----------+----------------------+-------------------+
|category_id|category_department_id|      category_name|
+-----------+----------------------+-------------------+
|          1|                     2|           Football|
|          2|                     2|             Soccer|
|          3|                     2|Baseball & Softball|
|          4|                     2|         Basketball|
|          5|                     2|           Lacrosse|
|          6|                     2|   Tennis & Racquet|
|          7|                     2|             Hockey|
|          8|                     2|        More Sports|
|          9|                     3|   Cardio Equipment|
|         10|                     3|  Strength Training|
|         11|                     3|Fitness Accessories|
|         12|                     3|       Boxing & MMA|
|         13|                     3|        Electronics|
|         14|                     3|     Yoga & Pilates|
|         15|                  

In [37]:
categories.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- category_department_id: integer (nullable = true)
 |-- category_name: string (nullable = true)



In [40]:
categories.cache()
departments.cache()
order_items.cache()
orders.cache()
products.cache()

DataFrame[product_id: int, product_category_id: int, product_name: string, product_description: string, product_price: float, product_image: string]

In [41]:
departments.printSchema()

root
 |-- department_id: integer (nullable = true)
 |-- department_name: string (nullable = true)



In [42]:
order_items.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)



In [43]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: long (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [44]:
products.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_category_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_price: float (nullable = true)
 |-- product_image: string (nullable = true)



In [45]:
categories.select(categories.category_name).show()

+-------------------+
|      category_name|
+-------------------+
|           Football|
|             Soccer|
|Baseball & Softball|
|         Basketball|
|           Lacrosse|
|   Tennis & Racquet|
|             Hockey|
|        More Sports|
|   Cardio Equipment|
|  Strength Training|
|Fitness Accessories|
|       Boxing & MMA|
|        Electronics|
|     Yoga & Pilates|
|  Training by Sport|
|    As Seen on  TV!|
|             Cleats|
|     Men's Footwear|
|   Women's Footwear|
|     Kids' Footwear|
+-------------------+
only showing top 20 rows



In [46]:
"""-- najbardziej popularne kategorie produktów
select c.category_name, count(order_item_quantity) as count
from order_items oi
inner join products p on oi.order_item_product_id = p.product_id
inner join categories c on c.category_id = p.product_category_id
group by c.category_name
order by count desc
limit 10;
"""

'-- najbardziej popularne kategorie produktów\nselect c.category_name, count(order_item_quantity) as count\nfrom order_items oi\ninner join products p on oi.order_item_product_id = p.product_id\ninner join categories c on c.category_id = p.product_category_id\ngroup by c.category_name\norder by count desc\nlimit 10;\n'

In [50]:
cats = categories.join(departments, categories.category_department_id == departments.department_id)

In [51]:
cats.show()

+-----------+----------------------+-------------------+-------------+---------------+
|category_id|category_department_id|      category_name|department_id|department_name|
+-----------+----------------------+-------------------+-------------+---------------+
|          1|                     2|           Football|            2|        Fitness|
|          1|                     2|           Football|            2|        Fitness|
|          2|                     2|             Soccer|            2|        Fitness|
|          2|                     2|             Soccer|            2|        Fitness|
|          3|                     2|Baseball & Softball|            2|        Fitness|
|          3|                     2|Baseball & Softball|            2|        Fitness|
|          4|                     2|         Basketball|            2|        Fitness|
|          4|                     2|         Basketball|            2|        Fitness|
|          5|                     2|       

In [52]:
cats = cats.drop(categories.category_department_id)

In [53]:
cats.show()

+-----------+-------------------+-------------+---------------+
|category_id|      category_name|department_id|department_name|
+-----------+-------------------+-------------+---------------+
|          1|           Football|            2|        Fitness|
|          1|           Football|            2|        Fitness|
|          2|             Soccer|            2|        Fitness|
|          2|             Soccer|            2|        Fitness|
|          3|Baseball & Softball|            2|        Fitness|
|          3|Baseball & Softball|            2|        Fitness|
|          4|         Basketball|            2|        Fitness|
|          4|         Basketball|            2|        Fitness|
|          5|           Lacrosse|            2|        Fitness|
|          5|           Lacrosse|            2|        Fitness|
|          6|   Tennis & Racquet|            2|        Fitness|
|          6|   Tennis & Racquet|            2|        Fitness|
|          7|             Hockey|       

In [55]:
cats.sort(cats.category_name).show()

+-----------+-------------------+-------------+---------------+
|category_id|      category_name|department_id|department_name|
+-----------+-------------------+-------------+---------------+
|         40|        Accessories|            6|       Outdoors|
|         40|        Accessories|            6|       Outdoors|
|         22|        Accessories|            4|        Apparel|
|         40|        Accessories|            6|       Outdoors|
|         22|        Accessories|            4|        Apparel|
|         27|        Accessories|            5|           Golf|
|         27|        Accessories|            5|           Golf|
|         27|        Accessories|            5|           Golf|
|         22|        Accessories|            4|        Apparel|
|         22|        Accessories|            4|        Apparel|
|         27|        Accessories|            5|           Golf|
|         40|        Accessories|            6|       Outdoors|
|         16|    As Seen on  TV!|       

In [56]:
cats.sort(cats.category_name).take(10)

[Row(category_id=27, category_name='Accessories', department_id=5, department_name='Golf'),
 Row(category_id=40, category_name='Accessories', department_id=6, department_name='Outdoors'),
 Row(category_id=22, category_name='Accessories', department_id=4, department_name='Apparel'),
 Row(category_id=40, category_name='Accessories', department_id=6, department_name='Outdoors'),
 Row(category_id=22, category_name='Accessories', department_id=4, department_name='Apparel'),
 Row(category_id=40, category_name='Accessories', department_id=6, department_name='Outdoors'),
 Row(category_id=27, category_name='Accessories', department_id=5, department_name='Golf'),
 Row(category_id=27, category_name='Accessories', department_id=5, department_name='Golf'),
 Row(category_id=22, category_name='Accessories', department_id=4, department_name='Apparel'),
 Row(category_id=27, category_name='Accessories', department_id=5, department_name='Golf')]

In [57]:
"""-- najbardziej popularne kategorie produktów
select c.category_name, count(order_item_quantity) as count
from order_items oi
inner join products p on oi.order_item_product_id = p.product_id
inner join categories c on c.category_id = p.product_category_id
group by c.category_name
order by count desc
limit 10;
"""

'-- najbardziej popularne kategorie produktów\nselect c.category_name, count(order_item_quantity) as count\nfrom order_items oi\ninner join products p on oi.order_item_product_id = p.product_id\ninner join categories c on c.category_id = p.product_category_id\ngroup by c.category_name\norder by count desc\nlimit 10;\n'

In [58]:
join1 = order_items.join(products, products.product_id == order_items.order_item_product_id)

In [59]:
join2 = join1.join(categories, categories.category_id == join1.product_category_id)

In [63]:
category_count = join2.groupBy(join2.category_name).count()

In [64]:
category_count.show()

+-------------------+------+
|      category_name| count|
+-------------------+------+
|   Men's Golf Clubs|  2264|
|   Camping & Hiking|109832|
|Fitness Accessories|  2472|
|         Golf Shoes|  4192|
|         Basketball|   536|
|     Men's Footwear|177968|
|        Electronics| 25248|
|    Women's Apparel|168280|
|     Girls' Apparel|  9608|
|  Golf Bags & Carts|   488|
|    As Seen on  TV!|   544|
|       Boxing & MMA|  3384|
|Baseball & Softball|  5056|
| Hunting & Shooting|  3520|
|       Golf Apparel|  3528|
|           Trade-In|  7792|
| Women's Golf Clubs|  1448|
|      Shop By Sport| 87872|
|            Fishing|138600|
|       Water Sports|124320|
+-------------------+------+
only showing top 20 rows



In [65]:
category_count.sort("category_name").toPandas()

Unnamed: 0,category_name,count
0,Accessories,14240
1,As Seen on TV!,544
2,Baseball & Softball,5056
3,Basketball,536
4,Boxing & MMA,3384
5,Camping & Hiking,109832
6,Cardio Equipment,99896
7,Cleats,196408
8,Electronics,25248
9,Fishing,138600


In [66]:
category_count.sort(F.desc("category_name")).toPandas()

Unnamed: 0,category_name,count
0,Women's Golf Clubs,1448
1,Women's Apparel,168280
2,Water Sports,124320
3,Trade-In,7792
4,Tennis & Racquet,2624
5,Strength Training,888
6,Soccer,1104
7,Shop By Sport,87872
8,Men's Golf Clubs,2264
9,Men's Footwear,177968


In [67]:
category_count.sort(F.desc("count")).toPandas()

Unnamed: 0,category_name,count
0,Cleats,196408
1,Men's Footwear,177968
2,Women's Apparel,168280
3,Indoor/Outdoor Games,154384
4,Fishing,138600
5,Water Sports,124320
6,Camping & Hiking,109832
7,Cardio Equipment,99896
8,Shop By Sport,87872
9,Electronics,25248


In [71]:
categories.createOrReplaceTempView("categories")
departments.createOrReplaceTempView("departments")
order_items.createOrReplaceTempView("order_items")
products.createOrReplaceTempView("products")

In [75]:
spark.sql("""
select c.category_name, count(order_item_quantity) as count
from order_items oi
inner join products p on oi.order_item_product_id = p.product_id
inner join categories c on c.category_id = p.product_category_id
group by c.category_name
order by count desc
""").show()

+--------------------+------+
|       category_name| count|
+--------------------+------+
|              Cleats|196408|
|      Men's Footwear|177968|
|     Women's Apparel|168280|
|Indoor/Outdoor Games|154384|
|             Fishing|138600|
|        Water Sports|124320|
|    Camping & Hiking|109832|
|    Cardio Equipment| 99896|
|       Shop By Sport| 87872|
|         Electronics| 25248|
|         Accessories| 14240|
|          Golf Balls| 11800|
|      Girls' Apparel|  9608|
|         Golf Gloves|  8560|
|            Trade-In|  7792|
| Baseball & Softball|  5056|
|              Hockey|  4912|
|          Golf Shoes|  4192|
|        Golf Apparel|  3528|
|  Hunting & Shooting|  3520|
+--------------------+------+
only showing top 20 rows

