In [None]:
# pip install pyspark


In [1]:
# pip install findspark


Defaulting to user installation because normal site-packages is not writeable
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
    sys-platform (=="darwin") ; extra == 'objc'
                 ~^[0m[33m
[0mInstalling collected packages: findspark
Successfully installed findspark-2.0.1
Note: you may need to restart the kernel to use updated packages.


In [2]:
!pyspark --version


25/03/01 16:06:51 WARN Utils: Your hostname, liltimz resolves to a loopback address: 127.0.1.1; using 172.29.249.245 instead (on interface eth0)
25/03/01 16:06:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.1
      /_/
                        
Using Scala version 2.13.8, OpenJDK 64-Bit Server VM, 11.0.2
Branch HEAD
Compiled by user centos on 2023-06-19T22:21:01Z
Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6
Url https://github.com/apache/spark
Type --help for more information.


In [4]:
import pyspark
print(pyspark.__version__)


3.4.1


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

25/03/07 16:54:49 WARN Utils: Your hostname, liltimz resolves to a loopback address: 127.0.1.1; using 172.29.249.245 instead (on interface eth0)
25/03/07 16:54:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/07 16:55:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# create a dataframe with an array column

In [13]:

#create a dataframe with an array column
df = spark.createDataFrame(
        [("abc", [1, 2]), ("cd", [3, 4])], ["id", "numbers"]
) 
df.show()

+---+-------+
| id|numbers|
+---+-------+
|abc| [1, 2]|
| cd| [3, 4]|
+---+-------+



In [15]:
# Print the schema of the DataFrame to verify that the numbers column is an array.
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- numbers: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [16]:
df = spark.createDataFrame(
    [("abc", [1, 2]), ("def", [3, 4])], ["id", "numbers"]
)
df.show()



+---+-------+
| id|numbers|
+---+-------+
|abc| [1, 2]|
|def| [3, 4]|
+---+-------+



                                                                                

# creating DF with StructType syntax

In [22]:
# creating DF with StructType syntax
from pyspark.sql.types import *
from pyspark.sql import Row

schema = StructType([
    StructField("id", StringType(), True),
    StructField("numbers", ArrayType(IntegerType()), True)
])

# Create sample data that matches the schema
data = [
    ("a", [1, 2, 3]),
    ("b", [4, 5, 6])
]
df = spark.createDataFrame(data, schema)

df.show()

+---+---------+
| id|  numbers|
+---+---------+
|  a|[1, 2, 3]|
|  b|[4, 5, 6]|
+---+---------+





In [28]:
from pyspark.sql.functions import col

# Add column "first_number" to the DF that returns the first element in the numbers array

In [29]:
# Add column "first_number" to the DF that returns the first element in the numbers array
# df.withColumn("first_number", df.numbers[2]).show()
# Note: withColumn function requires two arguments: the new column name and the column expression that defines its values
df.withColumn("first_number", col("numbers")[0]).show()

+---+---------+------------+
| id|  numbers|first_number|
+---+---------+------------+
|  a|[1, 2, 3]|           1|
|  b|[4, 5, 6]|           4|
+---+---------+------------+



# combine columns to array

In [31]:
df = spark.createDataFrame(
    [(33, 44), (55, 66)], ["num1", "num2"]
)
df.show()

+----+----+
|num1|num2|
+----+----+
|  33|  44|
|  55|  66|
+----+----+



# Add another column "nums" which is an array that contains num1 and num2 above

In [34]:
from pyspark.sql.functions import *
df.withColumn("nums", array(df.num1, df.num2)).show()

+----+----+--------+
|num1|num2|    nums|
+----+----+--------+
|  33|  44|[33, 44]|
|  55|  66|[55, 66]|
+----+----+--------+



# LIST AGGREGATION

In [36]:
# Create a DF with "first_name" and "color" columns that shows individuals and their colors

df = spark.createDataFrame(
    [("joe", "red"), ("joe", "blue"), ("Tim", "black")], ["first_name", "color"]
)

df.show()

[Stage 24:>                                                         (0 + 1) / 1]                                                                                

+----------+-----+
|first_name|color|
+----------+-----+
|       joe|  red|
|       joe| blue|
|       Tim|black|
+----------+-----+



In [38]:
# Group by first_name, create an ArrayType column with all colors given first_name
res = (df
      .groupBy(df.first_name)
      .agg(collect_list(col("color")).alias("colors")))

res.show()



+----------+-----------+
|first_name|     colors|
+----------+-----------+
|       joe|[red, blue]|
|       Tim|    [black]|
+----------+-----------+



                                                                                

In [40]:
res.printSchema()
# collect_list shows that some of Spark's API methods take advantage of ArrayType columns as well.

root
 |-- first_name: string (nullable = true)
 |-- colors: array (nullable = false)
 |    |-- element: string (containsNull = false)



In [42]:
# A PySpark array can be exploded into multiple rows, the opposite of collect_list.

df = spark.createDataFrame(
    [("abc", [1, 2]), ("def", [3, 4]), ("ghi", [5, 6])], ["id", "numbers"]
)
df.show()

+---+-------+
| id|numbers|
+---+-------+
|abc| [1, 2]|
|def| [3, 4]|
|ghi| [5, 6]|
+---+-------+



In [43]:
# Explode the array column, so there is only one number per DataFrame row.

In [45]:
df.select(col("id"), explode(col("numbers")).alias("number")).show()
# collect_list collapses multiple rows into a single row. explode does the opposite and expands an array into multiple rows.

+---+------+
| id|number|
+---+------+
|abc|     1|
|abc|     2|
|def|     3|
|def|     4|
|ghi|     5|
|ghi|     6|
+---+------+



# Writing to files

In [50]:
# You can write DataFrames with array columns to Parquet files without issue.
# df = spark.createDataFrame(
#     [("abc", [1, 2]), ("cd", [3, 4])], ["id", "numbers"]
# )

# parquet_path = "/Users/Liltimz/Desktop/slack/pyspark"
# df.write.parquet(parquet_path)

In [53]:
# NOTE: You cannot write DataFrames with array columns to CSV files:
# This isn't a limitation of Spark - it's a limitation of the CSV file format. CSV files can't handle complex column types like arrays. Parquet files are able to handle complex columns.

# Type conversions

In [54]:
# create a DataFrame with an integer column and a string column to demonstrate the surprising type conversion that takes place when different types are combined in a PySpark array
df = spark.createDataFrame(
    [("a", 8), ("b", 9)], ["letter", "number"]
)
df.show()

                                                                                

+------+------+
|letter|number|
+------+------+
|     a|     8|
|     b|     9|
+------+------+



In [56]:
# Combine the letter and number columns into an array "arr" and then fetch the number from the array
res = (df
       .withColumn("arr", array(df.letter, df.number))
       .withColumn("number2", col("arr")[1]))
res.show()



+------+------+------+-------+
|letter|number|   arr|number2|
+------+------+------+-------+
|     a|     8|[a, 8]|      8|
|     b|     9|[b, 9]|      9|
+------+------+------+-------+



In [57]:
# Print the schema to observe the number2 column is string type.
res.printSchema()

root
 |-- letter: string (nullable = true)
 |-- number: long (nullable = true)
 |-- arr: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- number2: string (nullable = true)



In [59]:
# Regular Python lists can hold values with different types. my_arr = [1, "a"] is valid in Python.

# PySpark arrays can only hold one type. In order to combine letter and number in an array, PySpark needs to convert number to a string.

# PySpark's type conversion causes you to lose valuable type information. It's arguable that the array function should error out when joining columns with different types, rather than implicitly converting types.

# It's best for you to explicitly convert types when combining different types into a PySpark array rather than relying on implicit conversions.

# Avoiding Dots / Periods in PySpark Column Names

In [4]:
# Let's create a DataFrame with country.name and continent columns.
df = spark.createDataFrame(
    [("china", "asia"), ("colombia", "souh america")], ["country.name", "continent"])
df.show()

                                                                                

+------------+------------+
|country.name|   continent|
+------------+------------+
|       china|        asia|
|    colombia|souh america|
+------------+------------+



In [6]:
# df.select("country.name")
# this will give an error

In [9]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, Row

schema = StructType([
    StructField("person.name", StringType(), True),
    StructField("person", StructType([
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)]))
])
data = [
    ("charles", Row("chuck", 42)),
    ("lawrence", Row("larry", 73))
]
df = spark.createDataFrame(data, schema)
df.show()



+-----------+-----------+
|person.name|     person|
+-----------+-----------+
|    charles|{chuck, 42}|
|   lawrence|{larry, 73}|
+-----------+-----------+



                                                                                

In [10]:
cols = ["person", "person.name", "`person.name`"]
df.select(cols).show()

+-----------+-----+-----------+
|     person| name|person.name|
+-----------+-----+-----------+
|{chuck, 42}|chuck|    charles|
|{larry, 73}|larry|   lawrence|
+-----------+-----+-----------+



In [11]:
# Replaces dots with underscores in column names
clean_df = df.toDF(*(c.replace('.', '_') for c in df.columns))
clean_df.show()

[Stage 7:>                                                          (0 + 3) / 3]

+-----------+-----------+
|person_name|     person|
+-----------+-----------+
|    charles|{chuck, 42}|
|   lawrence|{larry, 73}|
+-----------+-----------+





In [12]:
clean_df.select("person_name", "person.name", "person.age").show()

+-----------+-----+---+
|person_name| name|age|
+-----------+-----+---+
|    charles|chuck| 42|
|   lawrence|larry| 73|
+-----------+-----+---+



