In [112]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *


In [2]:
spark = SparkSession.builder.master("local[*]").appName("agg").getOrCreate()

In [3]:
df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("../data/retail-data/all/*.csv")
    .coalesce(5)
)
df.cache()


DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [4]:
df.count()

541909

In [6]:
df.select(count("StockCode").alias("count_stock_code")).show()

+----------------+
|count_stock_code|
+----------------+
|          541909|
+----------------+



In [7]:
df.select(count_distinct("StockCode").alias("count_stock_code")).show()

+----------------+
|count_stock_code|
+----------------+
|            4070|
+----------------+



In [8]:
df.select(approx_count_distinct("StockCode", 0.03).alias("count_stock_code")).show()

+----------------+
|count_stock_code|
+----------------+
|            4068|
+----------------+



In [9]:
df.select(first("StockCode"), last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



In [10]:
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [11]:
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [12]:
df.select(sum_distinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [13]:
df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases"),
).selectExpr(
    "(total_purchases/total_transactions) as ratio_purchases_transactions",
    "avg_purchases",
    "mean_purchases",
).show()


+----------------------------+----------------+----------------+
|ratio_purchases_transactions|   avg_purchases|  mean_purchases|
+----------------------------+----------------+----------------+
|            9.55224954743324|9.55224954743324|9.55224954743324|
+----------------------------+----------------+----------------+



In [14]:
df.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows



In [15]:
df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



grouping

In [18]:
(
    df.where(col("CustomerId").isNotNull())
    .groupBy("InvoiceNo", "CustomerId")
    .count()
    .orderBy(desc("count"))
).show()


+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   576339|     14096|  542|
|   579196|     14096|  533|
|   580727|     14096|  529|
|   578270|     14096|  442|
|   573576|     14096|  435|
|   567656|     14096|  421|
|   567183|     14769|  399|
|   575607|     14096|  377|
|   571441|     14096|  364|
|   570488|     14096|  353|
|   572552|     14096|  352|
|   568346|     14096|  335|
|   547063|     14769|  294|
|   569246|     14096|  285|
|   562031|     16984|  277|
|   554098|     14769|  264|
|   570672|     12536|  259|
|   543040|     17337|  259|
|   569897|     17813|  239|
|   572103|     17571|  223|
+---------+----------+-----+
only showing top 20 rows



In [28]:
(df.groupBy("InvoiceNo").agg(count_distinct("Quantity").alias("quan"))).show()

+---------+----+
|InvoiceNo|quan|
+---------+----+
|   543641|   8|
|   541518|  17|
|   541783|   7|
|   540976|   8|
|   542375|   4|
|   536938|   4|
|   539630|   7|
|   538879|   7|
|   539275|   4|
|   544303|   4|
|   538184|   7|
|   538517|   8|
|   540540|   4|
|   537691|   9|
|   538041|   1|
|   541432|   4|
|  C542604|   5|
|   542026|   3|
|   540499|   8|
|   537252|   1|
+---------+----+
only showing top 20 rows



In [33]:
pessoas = (
    df.groupBy("InvoiceNo")
    .agg(sum("Quantity").alias("sum_quantity"))
    .where(col("sum_quantity") > 0)
    .dropDuplicates()
)


In [38]:
pessoas.agg(max(col("sum_quantity")), min(col("sum_quantity"))).show()

+-----------------+-----------------+
|max(sum_quantity)|min(sum_quantity)|
+-----------------+-----------------+
|            80995|                1|
+-----------------+-----------------+



In [49]:
w = Window.orderBy(desc("sum_quantity"))

pessoas_10_pct = pessoas.withColumn("10_porcento", ntile(10).over(w))


In [55]:
# pessoas_10_pct.orderBy(desc("sum_quantity")).show()


pessoas_10_pct.where(col("10_porcento") == 1).agg(min(col("sum_quantity"))).show()


+-----------------+
|min(sum_quantity)|
+-----------------+
|              522|
+-----------------+



In [110]:
df_with_date = df.withColumn("date", to_date(col("InvoiceDate"), "M/d/y H:mm"))

In [111]:
df_with_date.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|      date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|U

In [113]:
windowSpec = (
    Window.partitionBy("CustomerID", "date")
    .orderBy(desc("Quantity"))
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)
sumPurchase = sum(("Quantity")).over(windowSpec)


In [119]:
df_with_date.where("CustomerId IS NOT NULL").orderBy("CustomerId").select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantity_rank"),
    purchaseDenseRank.alias("quantity_dense_rank"),
    maxPurchaseQuantity.alias("max_purchase_quantity"),
    sumPurchase.alias("sum_quantity"),
).show()


+----------+----------+--------+-------------+-------------------+---------------------+------------+
|CustomerId|      date|Quantity|quantity_rank|quantity_dense_rank|max_purchase_quantity|sum_quantity|
+----------+----------+--------+-------------+-------------------+---------------------+------------+
|     12346|2011-01-18|   74215|            1|                  1|                74215|       74215|
|     12346|2011-01-18|  -74215|            2|                  2|                74215|           0|
|     12347|2010-12-07|      36|            1|                  1|                   36|          36|
|     12347|2010-12-07|      30|            2|                  2|                   36|          66|
|     12347|2010-12-07|      24|            3|                  3|                   36|          90|
|     12347|2010-12-07|      12|            4|                  4|                   36|         102|
|     12347|2010-12-07|      12|            4|                  4|                

In [120]:
pivoted = df_with_date.groupBy("date").pivot("Country").sum()

In [123]:
pivoted.filter('date > "2011-12-05"').select("date", "`USA_sum(Quantity)`").show()

+----------+-----------------+
|      date|USA_sum(Quantity)|
+----------+-----------------+
|2011-12-06|             null|
|2011-12-09|             null|
|2011-12-08|             -196|
|2011-12-07|             null|
+----------+-----------------+



join

In [125]:
person = spark.createDataFrame(
    [
        (0, "Bill Chambers", 0, [100]),
        (1, "Matei Zaharia", 1, [500, 250, 100]),
        (2, "Michael Armbrust", 1, [250, 100]),
    ]
).toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame(
    [
        (0, "Masters", "School of Information", "UC Berkeley"),
        (2, "Masters", "EECS", "UC Berkeley"),
        (1, "Ph.D.", "EECS", "UC Berkeley"),
    ]
).toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame(
    [(500, "Vice President"), (250, "PMC Member"), (100, "Contributor")]
).toDF("id", "status")


Py4JError: An error occurred while calling o716.legacyInferArrayTypeFromFirstElement. Trace:
py4j.Py4JException: Method legacyInferArrayTypeFromFirstElement([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)

