In [90]:
import pyspark
import os

# Create SparkSession
sparkql = pyspark.sql.SparkSession.builder.master('local').getOrCreate()

# Create path and file variables
data_dir = './data'
coffee_data = 'coffee.csv'

# Create coffee PySpark DataFrame
coffee_df = sparkql.read.csv(os.path.join(data_dir, coffee_data), header=True)

# Show columns, schema, and df
print(coffee_df.columns)
print(coffee_df.schema)
coffee_df.show(4)

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Currency']
StructType([StructField('Date', StringType(), True), StructField('Open', StringType(), True), StructField('High', StringType(), True), StructField('Low', StringType(), True), StructField('Close', StringType(), True), StructField('Volume', StringType(), True), StructField('Currency', StringType(), True)])
+----------+------+-----+------+------+------+--------+
|      Date|  Open| High|   Low| Close|Volume|Currency|
+----------+------+-----+------+------+------+--------+
|2000-01-03|122.25|124.0| 116.1| 116.5|  6640|     USD|
|2000-01-04|116.25|120.5|115.75|116.25|  5492|     USD|
|2000-01-05| 115.0|121.0| 115.0| 118.6|  6165|     USD|
|2000-01-06| 119.0|121.4| 116.5|116.85|  5094|     USD|
+----------+------+-----+------+------+------+--------+
only showing top 4 rows



In [91]:
# Change the data types of columns
from pyspark.sql.functions import to_date
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType

# Cast Date column as DateType with to_date method
coffee_df = coffee_df.withColumn('Date', to_date(coffee_df['Date'], format='yyyy-MM-dd'))
# Make sure date is properly formatted
coffee_df.select('Date').show(4)

# Cast other columns as FloatType
coffee_df = coffee_df.withColumn('Open', coffee_df['Open'].cast(FloatType()))
coffee_df = coffee_df.withColumn('High', coffee_df['High'].cast(FloatType()))
coffee_df = coffee_df.withColumn('Low', coffee_df['Low'].cast(FloatType()))
coffee_df = coffee_df.withColumn('Close', coffee_df['Close'].cast(FloatType()))

coffee_df = coffee_df.withColumn('Volume', coffee_df['Volume'].cast(IntegerType()))

print(coffee_df.schema)

+----------+
|      Date|
+----------+
|2000-01-03|
|2000-01-04|
|2000-01-05|
|2000-01-06|
+----------+
only showing top 4 rows

StructType([StructField('Date', DateType(), True), StructField('Open', FloatType(), True), StructField('High', FloatType(), True), StructField('Low', FloatType(), True), StructField('Close', FloatType(), True), StructField('Volume', IntegerType(), True), StructField('Currency', StringType(), True)])


In [92]:
from pyspark.sql.functions import round

# Create High-Low and Open-Close difference Columns
coffee_df = coffee_df.withColumn('Open_Close_Diff', round(coffee_df.Open - coffee_df.Close, 2))
coffee_df = coffee_df.withColumn('High_Low_Diff', round(coffee_df.High - coffee_df.Low, 2))
coffee_df.show(4)

+----------+------+-----+------+------+------+--------+---------------+-------------+
|      Date|  Open| High|   Low| Close|Volume|Currency|Open_Close_Diff|High_Low_Diff|
+----------+------+-----+------+------+------+--------+---------------+-------------+
|2000-01-03|122.25|124.0| 116.1| 116.5|  6640|     USD|           5.75|          7.9|
|2000-01-04|116.25|120.5|115.75|116.25|  5492|     USD|            0.0|         4.75|
|2000-01-05| 115.0|121.0| 115.0| 118.6|  6165|     USD|           -3.6|          6.0|
|2000-01-06| 119.0|121.4| 116.5|116.85|  5094|     USD|           2.15|          4.9|
+----------+------+-----+------+------+------+--------+---------------+-------------+
only showing top 4 rows



In [93]:
from pyspark.sql.functions import lit, when

# Create Boolean column based on Volume values
coffee_df = coffee_df.withColumn('volume_filter_100', when(coffee_df.Volume >= 100, lit(True)).otherwise(lit(False)))

# Making sure column was properly created
coffee_df.filter(coffee_df.volume_filter_100 == True).show(5)
coffee_df.filter(coffee_df.volume_filter_100 == False).show(10)

+----------+------+------+------+------+------+--------+---------------+-------------+-----------------+
|      Date|  Open|  High|   Low| Close|Volume|Currency|Open_Close_Diff|High_Low_Diff|volume_filter_100|
+----------+------+------+------+------+------+--------+---------------+-------------+-----------------+
|2000-01-03|122.25| 124.0| 116.1| 116.5|  6640|     USD|           5.75|          7.9|             true|
|2000-01-04|116.25| 120.5|115.75|116.25|  5492|     USD|            0.0|         4.75|             true|
|2000-01-05| 115.0| 121.0| 115.0| 118.6|  6165|     USD|           -3.6|          6.0|             true|
|2000-01-06| 119.0| 121.4| 116.5|116.85|  5094|     USD|           2.15|          4.9|             true|
|2000-01-07|117.25|117.75| 113.8|114.15|  6855|     USD|            3.1|         3.95|             true|
+----------+------+------+------+------+------+--------+---------------+-------------+-----------------+
only showing top 5 rows

+----------+------+------+----

In [94]:
# Create Absolute Value column
from pyspark.sql.functions import abs

coffee_df = coffee_df.withColumn('Open_Close_Abs', abs(coffee_df.Open_Close_Diff))

# Show Absolute Value column
coffee_df.select('Open_Close_Abs').show(10)

+--------------+
|Open_Close_Abs|
+--------------+
|          5.75|
|           0.0|
|           3.6|
|          2.15|
|           3.1|
|          5.95|
|           2.3|
|          1.15|
|           0.7|
|           5.2|
+--------------+
only showing top 10 rows



In [95]:
import pandas as pd
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf

# Create udf
@pandas_udf(FloatType())
def compute_net(c1: pd.Series,
                c2: pd.Series,
                c3: pd.Series, 
                c4: pd.Series, 
                c5: pd.Series) -> FloatType:
  return ((c1 + c2 + c3 + c4) / 4) * c5

# Create Winow
w = Window \
    .partitionBy('Open', 'High', 'Close', 'Low') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Create net_values column
coffee_df = coffee_df.withColumn('net_sales', compute_net(coffee_df.Open, coffee_df.High, coffee_df.Low, coffee_df.Close, coffee_df.Volume).over(w))

# Make sure net_values column is there 
coffee_df.select('net_sales').show(10)

[Stage 149:>                                                        (0 + 1) / 1]

23/02/13 05:09:08 ERROR Executor: Exception in task 0.0 in stage 149.0 (TID 111)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "pyarrow/array.pxi", line 1044, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 316, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Could not convert 0    1838.100067
dtype: float64 with type Series: tried to convert to float32

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterat

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "pyarrow/array.pxi", line 1044, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 316, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Could not convert 0    1838.100067
dtype: float64 with type Series: tried to convert to float32


In [102]:
from pyspark.sql.functions import avg
from pyspark.sql.functions import max

# Find stats
coffee_df.select(round(avg('Open_Close_Abs'), 2)).show()
print(coffee_df.select(coffee_df.Volume).where(coffee_df.Volume < 100).count())
coffee_df.select(round(avg('Open'), 2)).show()
coffee_df.select(max(coffee_df.High)).show()

+-----------------------------+
|round(avg(Open_Close_Abs), 2)|
+-----------------------------+
|                         1.76|
+-----------------------------+

1638
+-------------------+
|round(avg(Open), 2)|
+-------------------+
|             126.05|
+-------------------+

+---------+
|max(High)|
+---------+
|   306.25|
+---------+



In [120]:
import sys
import os

# Write df into paraquet file
dir_name = sys.path[0]
data_dir = os.path.join(dir_name, 'data')
print(dir_name)
print(data_dir)
coffee_parquet = os.path.join(data_dir, 'coffee_df.paraquet')
print(coffee_parquet)

coffee_df.write.mode('overwrite').parquet(coffee_parquet)

/mnt/c/Users/User/Documents/Epicodus/Code-Reviews/spark
/mnt/c/Users/User/Documents/Epicodus/Code-Reviews/spark/data
/mnt/c/Users/User/Documents/Epicodus/Code-Reviews/spark/data/coffee_df.paraquet


[Stage 248:>                                                        (0 + 1) / 1]

23/02/13 07:42:14 ERROR Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "pyarrow/array.pxi", line 1044, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 316, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Could not convert 0    1838.100067
dtype: float64 with type Series: tried to convert to float32

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.sca

Py4JJavaError: An error occurred while calling o1735.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 248.0 failed 1 times, most recent failure: Lost task 0.0 in stage 248.0 (TID 177) (172.20.88.132 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "pyarrow/array.pxi", line 1044, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 316, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Could not convert 0    1838.100067
dtype: float64 with type Series: tried to convert to float32

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:331)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:338)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:245)
	... 42 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "pyarrow/array.pxi", line 1044, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 316, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Could not convert 0    1838.100067
dtype: float64 with type Series: tried to convert to float32

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:331)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:338)
	... 9 more
