In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\apachespark\\spark-2.4.7-bin-hadoop2.7'

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Demo').getOrCreate()

In [3]:
from pyspark.sql.types import *
schemaTransData = StructType([
    StructField("txnid", IntegerType(), True),
    StructField("txndate", StringType(), True),
    StructField("custid", IntegerType(), True),
    StructField("amount", DoubleType(), True),
    StructField("category", StringType(), True),
    StructField("subcategory", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("txntype", StringType(), True),
                    ])

In [4]:
transDataDF = spark.read.schema(schemaTransData).csv('txnsSmall')

In [5]:
transDataDF.show(5)

+-----+----------+-------+------+------------------+--------------------+-----------+----------+-------+
|txnid|   txndate| custid|amount|          category|         subcategory|       city|     state|txntype|
+-----+----------+-------+------+------------------+--------------------+-----------+----------+-------+
|    0|06-26-2011|4007024| 40.33|Exercise & Fitness|Cardio Machine Ac...|Clarksville| Tennessee| credit|
|    1|05-26-2011|4006742|198.44|Exercise & Fitness|Weightlifting Gloves| Long Beach|California| credit|
|    2|06-01-2011|4009775|  5.58|Exercise & Fitness|Weightlifting Mac...|    Anaheim|California| credit|
|    3|06-05-2011|4002199|198.19|        Gymnastics|    Gymnastics Rings|  Milwaukee| Wisconsin| credit|
|    4|12-17-2011|4002613| 98.81|       Team Sports|        Field Hockey|Nashville  | Tennessee| credit|
+-----+----------+-------+------+------------------+--------------------+-----------+----------+-------+
only showing top 5 rows



In [6]:
transDataDF.printSchema()

root
 |-- txnid: integer (nullable = true)
 |-- txndate: string (nullable = true)
 |-- custid: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- txntype: string (nullable = true)



# Goal: Find  the total revenue generated based on category

### Approach 1 - SQL - based Approach

In [7]:
transDataDF.write.parquet("txnsPARQUET")

In [8]:
transDataDF.registerTempTable("transactions")

In [11]:
spark.sql("select count(txnid) as cash_txns from transactions where txntype = 'cash'").show()

+---------+
|cash_txns|
+---------+
|       64|
+---------+



In [21]:
spark.sql("select category, count(txnid) as highest_selling_category from transactions group by category order by highest_selling_category desc").show(1)

+------------------+------------------------+
|          category|highest_selling_category|
+------------------+------------------------+
|Outdoor Recreation|                      57|
+------------------+------------------------+
only showing top 1 row



In [22]:
spark.sql("select category, count(amount) as lowest_selling_category from transactions group by category order by lowest_selling_category asc").show(1)

+--------+-----------------------+
|category|lowest_selling_category|
+--------+-----------------------+
| Puzzles|                      7|
+--------+-----------------------+
only showing top 1 row



In [26]:
spark.sql("select t.subcategory, count(t.txnid) as subcategory_total from transactions t where t.category = (select category from (select count(txnid) as category_total, category from transactions group by category order by category_total desc limit 1) a) group by t.subcategory order by subcategory_total asc").show(1)

+-----------+-----------------+
|subcategory|subcategory_total|
+-----------+-----------------+
|       Golf|                1|
+-----------+-----------------+
only showing top 1 row



### Approach 2 - Using Programming Approach

In [48]:
from pyspark.sql.functions import *

In [31]:
transDataDF.where("txntype = 'cash'").agg({'txnid':'count'}).alias("cash_txns").show()

+------------+
|count(txnid)|
+------------+
|          64|
+------------+



In [32]:
transDataDF.where("txntype = 'cash'").count()

64

In [50]:
transDataDF.groupBy('category').agg({'txnid':'count'}).withColumnRenamed("count(txnid)", "count_txns").sort(desc("count_txns")).show(1)

+------------------+----------+
|          category|count_txns|
+------------------+----------+
|Outdoor Recreation|        57|
+------------------+----------+
only showing top 1 row



In [51]:
transDataDF.groupBy('category').agg({'txnid':'count'}).withColumnRenamed("count(txnid)", "count_txns").sort(asc("count_txns")).show(1)

+--------+----------+
|category|count_txns|
+--------+----------+
| Puzzles|         7|
+--------+----------+
only showing top 1 row



In [69]:
transDataDF.filter(transDataDF.category==transDataDF.groupBy('category').agg({'txnid':'count'}).sort(desc("count(txnid)")).collect()[0][0]).groupby('subcategory').agg({'txnid':'count'}).withColumnRenamed("count(txnid)", "count_txns").sort(asc("count_txns")).show(1)

+-----------+----------+
|subcategory|count_txns|
+-----------+----------+
|       Golf|         1|
+-----------+----------+
only showing top 1 row



In [None]:
#https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame