# Preparation

## Initialization

Ada 2 pilihan kalau pake `python`, bisa langsung pake `pyspark` library, atau download versi build nya dari web apache-spark nya. Tapi jika mau yg versi build, jangan lupa instalasi JAVA terlebih dahulu

In [1]:
# Install pyspark
!pip install pyspark

Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m112.1 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.5


## Data Download

In [2]:
#Download datasets
from IPython.display import clear_output

! wget https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/retail-data/all/online-retail-dataset.csv

--2024-05-23 15:45:40--  https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/retail-data/all/online-retail-dataset.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 45038760 (43M) [text/plain]
Saving to: ‘online-retail-dataset.csv’


2024-05-23 15:46:39 (854 KB/s) - ‘online-retail-dataset.csv’ saved [45038760/45038760]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("dibimbingDF").getOrCreate()

# Advanced Column Operation

## Window Function

In [4]:
# create dataframe
data = [("John", "Sales", 5000),
        ("Sara", "Engineering", 6500),
        ("Chris", "Sales", 5500),
        ("Rachel", "Engineering", 6500),
        ("Maria", "Engineering", 6000),
        ("Eric", "Sales", 4500),
        ("Tom", "Engineering", 7000),
        ("Kim", "Sales", 4000),
        ("Mike", "Engineering", 7500)]

schema = ["name", "department", "salary"]

In [5]:
# add rank and dense_rank columns
from pyspark.sql.functions import rank, dense_rank, ntile, row_number, col
from pyspark.sql.window import Window

df = spark.createDataFrame(data, schema)

w1 = Window().partitionBy("department").orderBy(col("salary").desc())
df = (
    df
    .withColumn("rank", rank().over(w1))
    .withColumn("dense_rank", dense_rank().over(w1))
    .withColumn("ntile", ntile(4).over(w1))
    .withColumn("row_num", row_number().over(w1))
)

In [6]:
df.show()

+------+-----------+------+----+----------+-----+-------+
|  name| department|salary|rank|dense_rank|ntile|row_num|
+------+-----------+------+----+----------+-----+-------+
|  Mike|Engineering|  7500|   1|         1|    1|      1|
|   Tom|Engineering|  7000|   2|         2|    1|      2|
|  Sara|Engineering|  6500|   3|         3|    2|      3|
|Rachel|Engineering|  6500|   3|         3|    3|      4|
| Maria|Engineering|  6000|   5|         4|    4|      5|
| Chris|      Sales|  5500|   1|         1|    1|      1|
|  John|      Sales|  5000|   2|         2|    2|      2|
|  Eric|      Sales|  4500|   3|         3|    3|      3|
|   Kim|      Sales|  4000|   4|         4|    4|      4|
+------+-----------+------+----+----------+-----+-------+



In [7]:
from pyspark.sql.functions import col, lead, lag, unix_timestamp
from pyspark.sql.window import Window

# create example dataset
data = [
    ('customer1', '2022-01-01 00:00:00'),
    ('customer1', '2022-01-02 00:00:00'),
    ('customer1', '2022-01-03 00:00:00'),
    ('customer2', '2022-01-01 00:00:00'),
    ('customer2', '2022-01-04 00:00:00'),
    ('customer2', '2022-01-06 00:00:00'),
]

df = (
    spark
    .createDataFrame(data, ['customer', 'transaction_time'])
    .withColumn('transaction_time_unix', unix_timestamp('transaction_time'))
)

# create window partitioned by customer and ordered by transaction_time
w = Window.partitionBy('customer').orderBy('transaction_time_unix')

(
    df
 .withColumn('prev_transaction_time', lag('transaction_time_unix').over(w))
 .withColumn('next_transaction_time', lead('transaction_time_unix').over(w))
 .withColumn('time_gap', col('transaction_time_unix') - col('prev_transaction_time'))
 .withColumn('next_time_gap', col('next_transaction_time') - col('transaction_time_unix'))
 .show()
)

+---------+-------------------+---------------------+---------------------+---------------------+--------+-------------+
| customer|   transaction_time|transaction_time_unix|prev_transaction_time|next_transaction_time|time_gap|next_time_gap|
+---------+-------------------+---------------------+---------------------+---------------------+--------+-------------+
|customer1|2022-01-01 00:00:00|           1640995200|                 null|           1641081600|    null|        86400|
|customer1|2022-01-02 00:00:00|           1641081600|           1640995200|           1641168000|   86400|        86400|
|customer1|2022-01-03 00:00:00|           1641168000|           1641081600|                 null|   86400|         null|
|customer2|2022-01-01 00:00:00|           1640995200|                 null|           1641254400|    null|       259200|
|customer2|2022-01-04 00:00:00|           1641254400|           1640995200|           1641427200|  259200|       172800|
|customer2|2022-01-06 00:00:00| 

In [8]:
from pyspark.sql.functions import sum, first, last, mean, stddev, window
from pyspark.sql import Window

# create a sample dataframe
sales_data = [("Product A", 100, "2022-01-01 10:00:00", "Location 1"),
              ("Product B", 200, "2022-01-01 11:00:00", "Location 1"),
              ("Product A", 150, "2022-01-02 12:00:00", "Location 2"),
              ("Product C", 300, "2022-01-03 14:00:00", "Location 2"),
              ("Product D", 250, "2022-01-04 15:00:00", "Location 2"),
              ("Product E", 150, "2022-01-05 16:00:00", "Location 3"),
              ("Product F", 180, "2022-01-06 17:00:00", "Location 3"),
              ("Product G", 220, "2022-01-07 18:00:00", "Location 3")]

sales_df = spark.createDataFrame(sales_data, ["Product", "Price", "Sale Time", "Location"])

In [9]:

# define window specification to partition by location and order by sale time
location_window_spec = Window.partitionBy("Location").orderBy("Sale Time")

# calculate total sales revenue for each location over the entire duration of the dataset
total_sales_revenue = sum("Price").over(Window.partitionBy("Location"))

# calculate first and last sale date for each location
first_sale_date = first("Sale Time").over(location_window_spec)
last_sale_date = last("Sale Time").over(location_window_spec)

# calculate average and standard deviation of the sale prices for each location
average_sale_price = mean("Price").over(location_window_spec)
stddev_sale_price = stddev("Price").over(location_window_spec)

# add the calculated columns to the dataframe
sales_df = (
    sales_df
    .withColumn("Total Sales Revenue", total_sales_revenue)
    .withColumn("First Sale Date", first_sale_date)
    .withColumn("Last Sale Date", last_sale_date)
    .withColumn("Average Sale Price", average_sale_price)
    .withColumn("Stddev Sale Price", stddev_sale_price)
)

# show the results
sales_df.show()

+---------+-----+-------------------+----------+-------------------+-------------------+-------------------+------------------+------------------+
|  Product|Price|          Sale Time|  Location|Total Sales Revenue|    First Sale Date|     Last Sale Date|Average Sale Price| Stddev Sale Price|
+---------+-----+-------------------+----------+-------------------+-------------------+-------------------+------------------+------------------+
|Product A|  100|2022-01-01 10:00:00|Location 1|                300|2022-01-01 10:00:00|2022-01-01 10:00:00|             100.0|              null|
|Product B|  200|2022-01-01 11:00:00|Location 1|                300|2022-01-01 10:00:00|2022-01-01 11:00:00|             150.0| 70.71067811865476|
|Product A|  150|2022-01-02 12:00:00|Location 2|                700|2022-01-02 12:00:00|2022-01-02 12:00:00|             150.0|              null|
|Product C|  300|2022-01-03 14:00:00|Location 2|                700|2022-01-02 12:00:00|2022-01-03 14:00:00|          

In [10]:
# group the data by location and window the data by day to get daily aggregated sales data
(
    sales_df
    .groupBy(window("Sale Time", "1 day"))
    .agg(sum("Price").alias("Daily Sales Revenue"))
    .show(truncate=False)
)

+------------------------------------------+-------------------+
|window                                    |Daily Sales Revenue|
+------------------------------------------+-------------------+
|{2022-01-01 00:00:00, 2022-01-02 00:00:00}|300                |
|{2022-01-02 00:00:00, 2022-01-03 00:00:00}|150                |
|{2022-01-03 00:00:00, 2022-01-04 00:00:00}|300                |
|{2022-01-04 00:00:00, 2022-01-05 00:00:00}|250                |
|{2022-01-05 00:00:00, 2022-01-06 00:00:00}|150                |
|{2022-01-06 00:00:00, 2022-01-07 00:00:00}|180                |
|{2022-01-07 00:00:00, 2022-01-08 00:00:00}|220                |
+------------------------------------------+-------------------+



In [11]:
# group the data by location and window the data by day to get daily aggregated sales data
(
    sales_df
    .groupBy("Location", window("Sale Time", "1 day"))
    .agg(sum("Price").alias("Daily Sales Revenue"))
    .show(truncate=False)
)

+----------+------------------------------------------+-------------------+
|Location  |window                                    |Daily Sales Revenue|
+----------+------------------------------------------+-------------------+
|Location 1|{2022-01-01 00:00:00, 2022-01-02 00:00:00}|300                |
|Location 2|{2022-01-02 00:00:00, 2022-01-03 00:00:00}|150                |
|Location 2|{2022-01-03 00:00:00, 2022-01-04 00:00:00}|300                |
|Location 2|{2022-01-04 00:00:00, 2022-01-05 00:00:00}|250                |
|Location 3|{2022-01-05 00:00:00, 2022-01-06 00:00:00}|150                |
|Location 3|{2022-01-06 00:00:00, 2022-01-07 00:00:00}|180                |
|Location 3|{2022-01-07 00:00:00, 2022-01-08 00:00:00}|220                |
+----------+------------------------------------------+-------------------+



## UDF

In [12]:
from pyspark.sql.functions import col, udf, pandas_udf
from pyspark.sql.types import IntegerType

# Create a sample dataframe
df = spark.createDataFrame([(1, "apple"), (2, "banana"), (3, "orange"),
                            (4, "apple"), (5, "banana"), (6, "orange"),
                            (7, "apple"), (8, "banana"), (9, "orange"),
                            (10, "apple")], ["id", "fruit"])

In [13]:
# Define the UDF logic
def string_length(s):
    return len(s)

# Define the Python UDF
string_length_udf = udf(string_length, IntegerType())

# Apply the Python UDF and display the result
df1 = df.withColumn("length", string_length_udf(col("fruit")))
df1.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



## Pandas UDF

In [14]:
import pandas as pd

# Define the Pandas UDF
@pandas_udf(IntegerType())
def string_length_pandas_udf(s: pd.Series) -> pd.Series:
    return s.str.len()

# Apply the Pandas UDF and display the result
df2 = df.withColumn("length", string_length_pandas_udf(col("fruit")))
df2.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



Geser ke local notebook dulu gan

# Partition & Bucket

## Data Cleanup

In [26]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import to_timestamp, to_date


schema = StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("Country", StringType(), True)
])

retail_df = (
    spark
    .read.csv('online-retail-dataset.csv',header=True, schema=schema)
    .withColumn("InvoiceTime", to_timestamp("InvoiceDate", "M/d/yyyy H:mm"))
    .withColumn("InvoiceDate", to_date("InvoiceDate", "M/d/yyyy H:mm"))
)
retail_df.show(10)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|CustomerID|       Country|        InvoiceTime|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    22752|SET 7 BABUSHKA NE..

## Partition

In [27]:
(
    retail_df
 .write
 .mode("overwrite")
 .partitionBy('InvoiceDate')
 .parquet("/content/spark/online-retail-partitioned")
)

Py4JJavaError: An error occurred while calling o589.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 37.0 failed 1 times, most recent failure: Lost task 10.0 in stage 37.0 (TID 171) (dibimbing-jupyter executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:358)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:266)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Mkdirs failed to create file:/content/spark/online-retail-partitioned/_temporary/0/_temporary/attempt_202405231637018802973701119427784_0037_m_000010_171/InvoiceDate=2011-11-27 (exists=false, cwd=file:/home/jovyan)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:329)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:482)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:420)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:409)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:158)
	at org.apache.spark.sql.execution.datasources.BaseDynamicPartitionDataWriter.renewCurrentWriter(FileFormatDataWriter.scala:298)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataSingleWriter.write(FileFormatDataWriter.scala:365)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:85)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:341)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:255)
	... 42 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:358)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:266)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.io.IOException: Mkdirs failed to create file:/content/spark/online-retail-partitioned/_temporary/0/_temporary/attempt_202405231637018802973701119427784_0037_m_000010_171/InvoiceDate=2011-11-27 (exists=false, cwd=file:/home/jovyan)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:329)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:482)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:420)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:409)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:158)
	at org.apache.spark.sql.execution.datasources.BaseDynamicPartitionDataWriter.renewCurrentWriter(FileFormatDataWriter.scala:298)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataSingleWriter.write(FileFormatDataWriter.scala:365)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:85)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:341)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	... 9 more


In [None]:
partitioned_table = (
    spark
    .read
    .parquet("/content/spark/online-retail-partitioned")
)

In [None]:
partitioned_table.printSchema()

In [None]:
partitioned_table.explain()

In [None]:
partitioned_table.registerTempTable("partitioned_table")

In [None]:
spark.sql("select * from partitioned_table where InvoiceDate='2010-12-08' limit 10").explain()

## Bucket

In [None]:
from pyspark.sql.functions import hour


(
    retail_df
 .withColumn("Hour", hour("InvoiceTime"))
 .write
 .mode("overwrite")
 .partitionBy("InvoiceDate")
 .bucketBy(2, "Hour")
 .sortBy("InvoiceTime")
 .option("path", "/content/spark/online-retail-bucketed")
 .saveAsTable("retail_bucketed")
)


Now we have 2 parquet, instead of 1

# Pandas API On Spark

In [18]:
import pyspark.pandas as ps

retail_ps = retail_df.pandas_api()

'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.


In [19]:
retail_ps.head()

TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.

TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.

In [20]:
aggregated_df = retail_ps.groupby("InvoiceDate").agg({"InvoiceNo": "count"}).reset_index()
aggregated_df.columns = ["InvoiceDate", "InvoiceCount"]
aggregated_df["InvoiceDate"] = aggregated_df["InvoiceDate"].astype("datetime64[ns]")

In [21]:
aggregated_df.head()

TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.

TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.

In [22]:
retail_ps[["UnitPrice"]].plot.hist()

ImportError: plotly is required for plotting when the default backend 'plotly' is selected.

In [24]:
import pandas as pd

def add_one (_ls):
   return [ent+1 for ent in _ls]
ls = [1,2,3,4,5]
ls_plus_one = add_one(ls)
ls_plus_one = add_one(ls_plus_one)
ls_plus_one = add_one(ls_plus_one)
ls_plus_one = add_one(ls_plus_one)

In [25]:
ls_plus_one

[5, 6, 7, 8, 9]