In [1]:
from pyspark.sql import SparkSession

In [2]:
#Initializing Spark
spark = SparkSession.builder.getOrCreate()

24/07/31 10:16:50 WARN Utils: Your hostname, MacBook-Pro-de-Leonardo-2.local resolves to a loopback address: 127.0.0.1; using 192.0.0.2 instead (on interface en0)
24/07/31 10:16:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/31 10:16:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/31 10:16:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/07/31 10:16:51 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


***DataFrame Creation***

In [3]:
#DataFrame Creation on Spark
from datetime import date, datetime
import pandas as pd
from pyspark.sql import Row

In [4]:
#Simple Spark DataFrame
df = spark.createDataFrame([
    Row(a = 1, b = 2.5, c = 'String', d = date(2024, 1, 1), e = datetime(2024, 2, 5, 12, 0)),
    Row(a = 2, b = 3.2, c = 'String2', d = date(2024, 2, 10), e = datetime(2024, 2, 3, 11, 0)),
    Row(a = 3, b = 4.6, c = 'String3', d = date(2024, 3, 21), e = datetime(2024, 2, 1, 9, 0))
])
df.show()

                                                                                

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.5| String|2024-01-01|2024-02-05 12:00:00|
|  2|3.2|String2|2024-02-10|2024-02-03 11:00:00|
|  3|4.6|String3|2024-03-21|2024-02-01 09:00:00|
+---+---+-------+----------+-------------------+



In [5]:
#Creating a Spark DataFrame based on a pandas DF
pandas_df = pd.DataFrame({
    'a':[1,2,3],
    'b':[2.5,6.6,7.9],
    'c':["string1", "string2", "string3"],
    'd':[date(2024, 1, 1), date(2024, 2, 5), date(2024, 7, 4)],
    'e':[datetime(2024, 1, 2, 9, 0), datetime(2024, 6, 7, 11, 0), datetime(2024, 4, 5, 0, 0)]
})
df = spark.createDataFrame(pandas_df)
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.5|string1|2024-01-01|2024-01-02 09:00:00|
|  2|6.6|string2|2024-02-05|2024-06-07 11:00:00|
|  3|7.9|string3|2024-07-04|2024-04-05 00:00:00|
+---+---+-------+----------+-------------------+



In [6]:
#See the Types of data in the dataframes
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



***Viewing Data***

In [7]:
#Vizualizing Specific Rows
df.show(1)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.5|string1|2024-01-01|2024-01-02 09:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row



In [8]:
#Enabeling eagerEval for the eager evaluation of Pyspark DF in notebooks.
#With this configuration we can have a more familiar vizualization of the table
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.5,string1,2024-01-01,2024-01-02 09:00:00
2,6.6,string2,2024-02-05,2024-06-07 11:00:00
3,7.9,string3,2024-07-04,2024-04-05 00:00:00


In [9]:
#To have a vertical vizualization of the rows we can do:
df.show(2, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.5                 
 c   | string1             
 d   | 2024-01-01          
 e   | 2024-01-02 09:00:00 
-RECORD 1------------------
 a   | 2                   
 b   | 6.6                 
 c   | string2             
 d   | 2024-02-05          
 e   | 2024-06-07 11:00:00 
only showing top 2 rows



In [10]:
#To see the columns names we can do as follows:
df.columns

['a', 'b', 'c', 'd', 'e']

In [11]:
#To have a more Statistic view of the data we can use the descibe method
df.select('a', 'b', 'c').describe().show()

+-------+---+------------------+-------+
|summary|  a|                 b|      c|
+-------+---+------------------+-------+
|  count|  3|                 3|      3|
|   mean|2.0| 5.666666666666667|   NULL|
| stddev|1.0|2.8183919765237295|   NULL|
|    min|  1|               2.5|string1|
|    max|  3|               7.9|string3|
+-------+---+------------------+-------+



In [12]:
#Collecting data to the driver side as the local data in python:
df.collect()
#This is dangerous because if the data is too large it can cause a memory issue
#For that we can use the take or tail method to collect specific data
df.take(2)
df.tail(2)

[Row(a=2, b=6.6, c='string2', d=datetime.date(2024, 2, 5), e=datetime.datetime(2024, 6, 7, 11, 0)),
 Row(a=3, b=7.9, c='string3', d=datetime.date(2024, 7, 4), e=datetime.datetime(2024, 4, 5, 0, 0))]

In [13]:
#We can also convert a PySpark dataframe into a pandas one, but the same rule
#we saw above can affect us here, as the memory problem.
df.toPandas()
df

a,b,c,d,e
1,2.5,string1,2024-01-01,2024-01-02 09:00:00
2,6.6,string2,2024-02-05,2024-06-07 11:00:00
3,7.9,string3,2024-07-04,2024-04-05 00:00:00


In [14]:
df.a

Column<'a'>

***Operations With Columns***

In [15]:
#Operations with columns
from pyspark.sql import Column
from pyspark.sql.functions import upper

In [16]:
#Returning another dataframe based on the original, using .select()
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



In [17]:
#We can add a new Column instance, with .withColumn
df.withColumn('upper_c', upper(df.c)).show() # type: ignore
#To add this change to the dataframe we have to save the operation with:
    # df = df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.5|string1|2024-01-01|2024-01-02 09:00:00|STRING1|
|  2|6.6|string2|2024-02-05|2024-06-07 11:00:00|STRING2|
|  3|7.9|string3|2024-07-04|2024-04-05 00:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [18]:
#Filter rows, with .filter
#Here we dont have the column we created above because we didnt save it
df.filter(df.a == 2).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  2|6.6|string2|2024-02-05|2024-06-07 11:00:00|
+---+---+-------+----------+-------------------+



***Grouping Data***

In [19]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df

color,fruit,v1,v2
red,banana,1,10
blue,banana,2,20
red,carrot,3,30
blue,grape,4,40
red,carrot,5,50
black,carrot,6,60
red,banana,7,70
red,grape,8,80


In [20]:
#Groupby and Getting the avg
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [23]:
# #You can apply Python functions to the dataframe
# def plus_mean(pandas_df):
#     return pandas_df.assign(v1= pandas_df.v1 - pandas_df.v1.mean())

# df.groupby('color').applyInPandas(plus_mean, schema = df.schema).show()

***Getting Data***

In [27]:
#Getting data with CSV
df.write.csv('foo.csv', header=True)

In [28]:
spark.read.csv('foo.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  7| 70|
| blue| grape|  4| 40|
|  red|carrot|  3| 30|
|  red|carrot|  5| 50|
|  red|banana|  1| 10|
|  red| grape|  8| 80|
+-----+------+---+---+



In [29]:
#Parquet
df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()

24/07/31 10:20:07 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/07/31 10:20:07 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/07/31 10:20:07 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|carrot|  5| 50|
| blue| grape|  4| 40|
|  red|banana|  7| 70|
|  red|banana|  1| 10|
|  red|carrot|  3| 30|
|  red| grape|  8| 80|
+-----+------+---+---+



                                                                                

In [30]:
#ORC
df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  1| 10|
|  red|carrot|  5| 50|
| blue| grape|  4| 40|
|  red|carrot|  3| 30|
+-----+------+---+---+



***Working with SQL***

In [33]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In [37]:
# from pyspark.sql.functions import pandas_udf
# @pandas_udf("integer")
# def add_one(s: pd.Series) -> pd.Series:
#     return s + 1

# spark.udf.register("add_one", add_one)
# spark.sql("SELECT add_one(v1) FROM tableA").show()

In [54]:
pip install pyarrow==15.0.0

Collecting pyarrow==15.0.0
  Using cached pyarrow-15.0.0-cp312-cp312-macosx_11_0_arm64.whl.metadata (3.0 kB)
Using cached pyarrow-15.0.0-cp312-cp312-macosx_11_0_arm64.whl (24.2 MB)
Installing collected packages: pyarrow
  Attempting uninstall: pyarrow
    Found existing installation: pyarrow 17.0.0
    Uninstalling pyarrow-17.0.0:
      Successfully uninstalled pyarrow-17.0.0
Successfully installed pyarrow-15.0.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/bin/python -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [55]:
import os

os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1'


In [56]:
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()

24/07/31 10:39:18 ERROR Executor: Exception in task 0.0 in stage 79.0 (TID 301)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1231, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1067, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 529, in read_single_udf
    f, re

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1231, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1067, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 529, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in read_command
    command = serializer._read_with_length(file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 174, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/jupyterlab/4.1.6_1/libexec/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'pandas'
