In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fnc
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StandardScaler

In [2]:
spark = SparkSession.builder.appName('Tutor_6_Preprocessing').getOrCreate()

In [3]:
tip_df = spark.read.csv('data/tips.csv',header=True, inferSchema=True)
tip_df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|   Yes|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



1. Explore data Structure & check Missing Values

In [12]:
tip_df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [4]:
list_cols = tip_df.columns
list_cols

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [5]:

list_filter_null = ["{} IS NULL".format(col) for col in list_cols]

In [6]:
counter_null =0
for col in list_filter_null:
    col_name = col.split(" ")[0]
    counter = tip_df.filter(col).count()
    counter_null+= counter
    print(f"{col_name} has {counter} null values")

total_bill has 0 null values
tip has 0 null values
sex has 0 null values
smoker has 0 null values
day has 0 null values
time has 0 null values
size has 0 null values


2. Pre-processing by:
- MinMaxScaler
- StandardScaler

Q1: Describe the difference from Total_bill & tip

Step 1: Select specified columns and assemble them
Step 2: Use transform() to create new DataFrame with transformed data 

In [7]:
bill_tip = VectorAssembler(inputCols=['total_bill','tip'], outputCol='bill_tip_feature')

In [8]:
tip_transformed = bill_tip.transform(tip_df)

In [9]:
tip_transformed.show(5)

+----------+----+------+------+---+------+----+----------------+
|total_bill| tip|   sex|smoker|day|  time|size|bill_tip_feature|
+----------+----+------+------+---+------+----+----------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|    [16.99,1.01]|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|    [10.34,1.66]|
|     21.01| 3.5|  Male|   Yes|Sun|Dinner|   3|     [21.01,3.5]|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|    [23.68,3.31]|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|    [24.59,3.61]|
+----------+----+------+------+---+------+----+----------------+
only showing top 5 rows



Step 3. Use pyspark.ml.feature.MinMaxScaler
- Initial a instance of MinMaxScaler()
- Use fit() to calculate min/ max value in 'bill_tip_feature' from 'tip_transformed'
- Apply transform() calculate min-max formula with each value on 'bill_tip_feature'
- Store transfered value in 'min_max_scaler' column

Min-Max formula:
x scaled = (x - min)/(max - min)

In [10]:
minMaxScal = MinMaxScaler(inputCol='bill_tip_feature', outputCol='min_max_scaled')

In [11]:
output = minMaxScal.fit(tip_transformed).transform(tip_transformed)

In [37]:
tip_transformed.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)
 |-- bill_tip_feature: vector (nullable = true)



In [38]:
from pyspark.sql.types import ArrayType, DoubleType

def round_array(arr):
    return [round(x, 3) for x in arr]

# Create a new user-defined function
round_udf = fnc.udf(round_array, ArrayType(DoubleType()))

In [39]:
rounded_output = output.select(
    round_udf('min_max_scaled').alias('rounded_scaler'))

In [41]:
rounded_output.columns
rounded_output.show(truncate=False, vertical=True)

Py4JJavaError: An error occurred while calling o242.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 56.0 failed 1 times, most recent failure: Lost task 0.0 in stage 56.0 (TID 82) (DESKTOP-198IC2R executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:109)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$2(BatchEvalPythonExec.scala:67)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:109)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$2(BatchEvalPythonExec.scala:67)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [33]:
output.show(n=5, truncate=False)

+----------+----+------+------+---+------+----+----------------+-----------------------------------------+
|total_bill|tip |sex   |smoker|day|time  |size|bill_tip_feature|min_max_scaled                           |
+----------+----+------+------+---+------+----+----------------+-----------------------------------------+
|16.99     |1.01|Female|No    |Sun|Dinner|2   |[16.99,1.01]    |[0.2915793883535818,0.001111111111111112]|
|10.34     |1.66|Male  |No    |Sun|Dinner|3   |[10.34,1.66]    |[0.15228320067029744,0.07333333333333332]|
|21.01     |3.5 |Male  |Yes   |Sun|Dinner|3   |[21.01,3.5]     |[0.3757855048177629,0.2777777777777778]  |
|23.68     |3.31|Male  |No    |Sun|Dinner|2   |[23.68,3.31]    |[0.43171344784248006,0.25666666666666665]|
|24.59     |3.61|Female|No    |Sun|Dinner|4   |[24.59,3.61]    |[0.45077503142019265,0.29]               |
+----------+----+------+------+---+------+----+----------------+-----------------------------------------+
only showing top 5 rows



In [31]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ShowExample").getOrCreate()

# Create a simple DataFrame
data = [(1, "Alice", [0.123456789, 1.987654321]),
        (2, "Bob", [2.234567890, 3.876543210])]
columns = ["ID", "Name", "Scores"]

df = spark.createDataFrame(data, schema=columns)

rounded_rs = df.select(round_udf('Scores').alias('round_score'))
rounded_rs.show(truncate=False)
# Display the DataFrame with different options
df.show(truncate=False)  # Full view without truncation
# df.show(n=5, vertical=True)  # Vertical display

+--------------+
|round_score   |
+--------------+
|[0.123, 1.988]|
|[2.235, 3.877]|
+--------------+

+---+-----+--------------------------+
|ID |Name |Scores                    |
+---+-----+--------------------------+
|1  |Alice|[0.123456789, 1.987654321]|
|2  |Bob  |[2.23456789, 3.87654321]  |
+---+-----+--------------------------+

