## Create SparkSession and import

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import Window

sc = SparkContext('local')
spark = SparkSession(sc)

24/01/03 12:12:19 WARN Utils: Your hostname, krxps resolves to a loopback address: 127.0.1.1; using 192.168.68.61 instead (on interface wlp0s20f3)
24/01/03 12:12:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/01/03 12:12:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Create a dataframe (with SparkSession)

In [5]:
df = spark.createDataFrame([(1,), (2,)], "id: int")
df.show()

df.createOrReplaceTempView("test")

df_table = spark.table("test")
df_table.show()

df_sql = spark.sql("select id from test where id = 2")
df_sql.show()

df_range = spark.range(0, 3)
df_range.show()

+---+
| id|
+---+
|  1|
|  2|
+---+

+---+
| id|
+---+
|  1|
|  2|
+---+

+---+
| id|
+---+
|  2|
+---+

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+



## Filter

In [12]:
df = spark.createDataFrame([(1, "a"), (2, "b")])

df.filter((col("_1") == 1) | (col("_2") == "a")).take(5)

df.filter("_1 == 1 or _2 == 'a'").take(5)

[Row(_1=1, _2='a')]

## Create column / Math function / Literal

In [13]:
df.withColumn("squared", pow("_1", lit(2))).show()

df.withColumn("squared", pow("_1", 2)).show()

+---+---+-------+
| _1| _2|squared|
+---+---+-------+
|  1|  a|    1.0|
|  2|  b|    4.0|
+---+---+-------+

+---+---+-------+
| _1| _2|squared|
+---+---+-------+
|  1|  a|    1.0|
|  2|  b|    4.0|
+---+---+-------+



In [31]:
df = spark.createDataFrame([(10, "a"), (20, "b"), (30, "c"), (40, "d"), (40, "e")], ["id", "name"])

df.select((col("id").between(20, 40) & col("name").isin("b", "c", "d")).alias("boolean")).show()


+-------+
|boolean|
+-------+
|  false|
|   true|
|   true|
|   true|
|  false|
+-------+



# Arrays

In [26]:
(df
 .withColumn("test", lit("a_b"))
 .withColumn("array", split(col("test"), "_"))
 .withColumn("second_element", col("array")[1])
 .withColumn("sorted", sort_array("array", asc=False))
 .withColumn("size", size("array"))
 .withColumn("contains", array_contains("array", "c"))
 .show()
)

+---+---+----+------+--------------+------+----+--------+
|  a|  b|test| array|second_element|sorted|size|contains|
+---+---+----+------+--------------+------+----+--------+
|  2| 20| a_b|[a, b]|             b|[b, a]|   2|   false|
|  3| 30| a_b|[a, b]|             b|[b, a]|   2|   false|
|  1| 10| a_b|[a, b]|             b|[b, a]|   2|   false|
|  1| 30| a_b|[a, b]|             b|[b, a]|   2|   false|
|  1| 20| a_b|[a, b]|             b|[b, a]|   2|   false|
+---+---+----+------+--------------+------+----+--------+



In [38]:
(df.withColumn("test", lit("a_b"))
 .withColumn("array", split("test", "_"))
 .withColumn("yo", explode(col("array")))
 .show()
)

+---+---+----+------+---+
| _1| _2|test| array| yo|
+---+---+----+------+---+
|  1|  a| a_b|[a, b]|  a|
|  1|  a| a_b|[a, b]|  b|
|  2|  b| a_b|[a, b]|  a|
|  2|  b| a_b|[a, b]|  b|
+---+---+----+------+---+



# Manpulate strings

In [4]:
df.withColumn("upper", upper(col("_2"))).show()

+---+---+-----+
| _1| _2|upper|
+---+---+-----+
|  1|  a|    A|
|  2|  b|    B|
+---+---+-----+



In [40]:
(df.withColumn("description", lit("Description: bla bla"))
 .withColumn("result", regexp_replace("description", "^Description: ", ""))
 .show()
)

+---+---+--------------------+-------+
| _1| _2|         description| result|
+---+---+--------------------+-------+
|  1|  a|Description: bla bla|bla bla|
|  2|  b|Description: bla bla|bla bla|
+---+---+--------------------+-------+



# Rename a column

In [43]:
(df.withColumnRenamed("_1", "id")
 .withColumn("name", col("_2"))
 .show()
)

+---+---+----+
| id| _2|name|
+---+---+----+
|  1|  a|   a|
|  2|  b|   b|
+---+---+----+



# Remove duplicates

In [22]:
df_with_duplicates = df.withColumn("test", lit("yo"))
df_with_duplicates.show()

df_with_duplicates.drop_duplicates(subset = ["test"]).show()

df_with_duplicates.select("test").distinct().show()

+---+---+----+
|  a|  b|test|
+---+---+----+
|  2| 20|  yo|
|  3| 30|  yo|
|  1| 10|  yo|
|  1| 30|  yo|
|  1| 20|  yo|
+---+---+----+

+---+---+----+
|  a|  b|test|
+---+---+----+
|  2| 20|  yo|
+---+---+----+

+----+
|test|
+----+
|  yo|
+----+



## Aggregations

In [45]:
df.agg(mean(col("_1")).alias("mean")).show()

+----+
|mean|
+----+
| 1.5|
+----+



In [40]:
df.agg(approx_count_distinct(col("_1"), 0.15).alias("distincCount")).show()

+------------+
|distincCount|
+------------+
|           2|
+------------+



In [28]:
df = spark.createDataFrame([("A", 3), ("B", 30), ("B", 15), ("A", 100)], ["id", "value"])

df.groupBy("id").agg(max("value").alias("highest"), min("value").alias("lowest")).show()

+---+-------+------+
| id|highest|lowest|
+---+-------+------+
|  B|     30|    15|
|  A|    100|     3|
+---+-------+------+



In [4]:
df1 = spark.createDataFrame([
    Row(course="dotNET", year=2012, earnings=10000),
    Row(course="Java", year=2012, earnings=20000),
    Row(course="dotNET", year=2012, earnings=5000),
    Row(course="dotNET", year=2013, earnings=48000),
    Row(course="Java", year=2013, earnings=30000),
])
df1.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show()


[Stage 0:>                                                          (0 + 1) / 1]

+----+------+-----+
|year|dotNET| Java|
+----+------+-----+
|2012| 15000|20000|
|2013| 48000|30000|
+----+------+-----+



                                                                                

## Window

In [2]:
d = [
    {'name': 'Alice', 'age': 40, 'country': 'France', 'date': "2023-10-01"},
    {'name': 'Jane', 'age': 28, 'country': 'France', 'date': "2023-10-01"},
    {'name': 'Bob', 'age': 30, 'country': 'France', 'date': "2023-10-10"},
    {'name': 'Richard', 'age': 50, 'country': 'Allemagne', 'date': "2023-10-10"},
    {'name': 'Omar', 'age': 20, 'country': 'Italie', 'date': "2023-10-26"}
]

df = spark.createDataFrame(d).withColumn("date", to_date("date"))

window = (Window
          .partitionBy("country", "date")
          .orderBy(desc("age"))
          .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df.withColumn("rank", rank().over(window)).show()

[Stage 0:>                                                          (0 + 1) / 1]

+---+---------+----------+-------+----+
|age|  country|      date|   name|rank|
+---+---------+----------+-------+----+
| 50|Allemagne|2023-10-10|Richard|   1|
| 40|   France|2023-10-01|  Alice|   1|
| 28|   France|2023-10-01|   Jane|   2|
| 30|   France|2023-10-10|    Bob|   1|
| 20|   Italie|2023-10-26|   Omar|   1|
+---+---------+----------+-------+----+



                                                                                

# Sort

In [7]:
df = spark.createDataFrame([(2, 20), (3, 30), (1, 10), (1, 30), (1, 20)], ["a", "b"])

assert df.orderBy("a", ascending=False).select("a").first().a == 3
assert df.sort(col("a").desc()).first().b == 30
assert df.sort(desc("a")).first().a == 3
assert df.sort("a", desc("b")).first() == Row(1, 30)

+---+---+
|  a|  b|
+---+---+
|  1| 30|
|  1| 20|
|  1| 10|
|  2| 20|
|  3| 30|
+---+---+



# Describe dataframe

In [8]:
df.describe("_1").show()

+-------+------------------+
|summary|                _1|
+-------+------------------+
|  count|                 2|
|   mean|               1.5|
| stddev|0.7071067811865476|
|    min|                 1|
|    max|                 2|
+-------+------------------+



# Sample

In [72]:
integers = [1, 2, 2, 3, 4, 4, 5]
dfInt = spark.createDataFrame(integers, IntegerType())

assert [row.value for row in dfInt.sample(True, fraction=0.5, seed=3).collect()] == [4, 5]
assert [row.value for row in dfInt.sample(False, fraction=0.5, seed=3).collect()] == [1, 4, 4]

# Dates

In [16]:
df = spark.createDataFrame([(1, "a"), (2, "b")])
(
    df.withColumn("test", lit(1408024997).cast("timestamp"))
    .withColumn("month", month(col("test")))
    .withColumn("day_of_year", dayofyear(col("test")))
    .withColumn("yo", from_unixtime(lit(1408024997), "EEEE, MMM d, yyyy h:mm a"))
    .withColumn("yo_timestamp", unix_timestamp(lit("02/01/2024 10:48"), "dd/MM/yyyy HH:mm"))
    .show(10, False)
)

+---+---+-------------------+-----+-----------+------------------------------+------------+
|_1 |_2 |test               |month|day_of_year|yo                            |yo_timestamp|
+---+---+-------------------+-----+-----------+------------------------------+------------+
|1  |a  |2014-08-14 16:03:17|8    |226        |Thursday, Aug 14, 2014 4:03 PM|1704188880  |
|2  |b  |2014-08-14 16:03:17|8    |226        |Thursday, Aug 14, 2014 4:03 PM|1704188880  |
+---+---+-------------------+-----+-----------+------------------------------+------------+



In [None]:
dfDates = spark.createDataFrame([("23/01/2022 11:28:12",),("24/01/2022 10:58:34",)], ["date"])
dfDates = dfDates.withColumn("date", to_timestamp("date", "dd/MM/yyyy HH:mm:ss"))

dfDates.show()
dfDates.printSchema()

# Fill with empty values (na.fill)

In [31]:
(
    df.withColumn("test_na_1", lit(None).cast(StringType()))
    .withColumn("test_na_2", lit(None).cast(IntegerType()))
    .withColumn("test_na_3", lit(None).cast(IntegerType()))
    .withColumn("test_na_4", lit(None).cast(IntegerType()))
    .na.fill("yo")
    .na.fill({'test_na_2': 30})
    .na.fill(5, "test_na_4")
    .show()
)

+---+---+---------+---------+---------+---------+
| _1| _2|test_na_1|test_na_2|test_na_3|test_na_4|
+---+---+---------+---------+---------+---------+
|  1|  a|       yo|       30|     null|        5|
|  2|  b|       yo|       30|     null|        5|
+---+---+---------+---------+---------+---------+



# Joins

In [7]:
a = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Rachid", 3)], "name:string, id: int")
b = spark.createDataFrame([("Franck", 3, 42), ("Bernard", 4, 42), ("Ramzy", 5, 43)], "name:string, id: int, dept_id: int")

a.crossJoin(b).show()

a.join(b, a.id == b.id).show()
a.join(b, [a.id == b.id])
a.alias("a").join(b.alias("b"), [col("a.id") == col("b.id"), col("a.name") == col("b.name")])
a.join(b.select("dept_id"), col("id") == col("dept_id"))

a.join(b, ["id", "name"], "outer").show()

+------+---+-------+---+-------+
|  name| id|   name| id|dept_id|
+------+---+-------+---+-------+
| Alice|  1| Franck|  3|     42|
| Alice|  1|Bernard|  4|     42|
| Alice|  1|  Ramzy|  5|     43|
|   Bob|  2| Franck|  3|     42|
|   Bob|  2|Bernard|  4|     42|
|   Bob|  2|  Ramzy|  5|     43|
|Rachid|  3| Franck|  3|     42|
|Rachid|  3|Bernard|  4|     42|
|Rachid|  3|  Ramzy|  5|     43|
+------+---+-------+---+-------+

+------+---+------+---+-------+
|  name| id|  name| id|dept_id|
+------+---+------+---+-------+
|Rachid|  3|Franck|  3|     42|
+------+---+------+---+-------+

+---+-------+-------+
| id|   name|dept_id|
+---+-------+-------+
|  1|  Alice|   null|
|  2|    Bob|   null|
|  3| Franck|     42|
|  3| Rachid|   null|
|  4|Bernard|     42|
|  5|  Ramzy|     43|
+---+-------+-------+



In [133]:
small_df = spark.createDataFrame([1, 2, 3], IntegerType())
large_df = spark.createDataFrame([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], IntegerType())

large_df.join(broadcast(small_df), "v").show()

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+



# Union

In [18]:
a = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Rachid", 3)], "name:string, id: int")
b = spark.createDataFrame([("Franck", 30), ("Bernard", 40), ("Ramzy", 50)], "name:string, age: int")

a.union(b).show()
a.unionByName(b, allowMissingColumns=True).show()

+-------+---+
|   name| id|
+-------+---+
|  Alice|  1|
|    Bob|  2|
| Rachid|  3|
| Franck| 30|
|Bernard| 40|
|  Ramzy| 50|
+-------+---+

+-------+----+----+
|   name|  id| age|
+-------+----+----+
|  Alice|   1|null|
|    Bob|   2|null|
| Rachid|   3|null|
| Franck|null|  30|
|Bernard|null|  40|
|  Ramzy|null|  50|
+-------+----+----+

+-------+
|   name|
+-------+
|  Alice|
|    Bob|
| Rachid|
| Franck|
|Bernard|
|  Ramzy|
+-------+



# Cast

In [25]:
df.withColumn("test", col("_1").cast(StringType())).printSchema()
df.withColumn("test", col("_1").cast("string")).printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- test: string (nullable = true)

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- test: string (nullable = true)



# UDFs

In [75]:
def add42(n):
    return n + 42
    
df.createOrReplaceTempView("test")

spark.udf.register("ADD_42", add42)

spark.sql("select ADD_42(_1), _1 from test").show()

23/12/13 22:06:01 WARN SimpleFunctionRegistry: The function add_42 replaced a previously registered function.
+----------+---+
|ADD_42(_1)| _1|
+----------+---+
|        43|  1|
|        44|  2|
+----------+---+



In [76]:
def multiply_by_10(n):
    return n * 10

multiply_by_10_UDF = udf(multiply_by_10, IntegerType())

df.withColumn("test", multiply_by_10_UDF("_1")).show()

+---+---+----+
| _1| _2|test|
+---+---+----+
|  1|  a|  10|
|  2|  b|  20|
+---+---+----+



# Cache and Persist

In [11]:
from pyspark import StorageLevel

# cache in MEMORY_AND_DISK_DESER (default)
df.cache().count()
assert str(df.storageLevel).startswith("Disk") == True
print(df.storageLevel)

df.unpersist()

# cache in MEMORY_ONLY
df.persist(StorageLevel.MEMORY_ONLY)
assert str(df.storageLevel).startswith("Memory") == True
print(df.storageLevel)

# stores dataframe on two different executors, utilizing the executors' memory as much as possible, but not writing anything to disk.
df.persist(StorageLevel.MEMORY_ONLY_2).count()

df.unpersist()

assert df.is_cached == False

Disk Memory Deserialized 1x Replicated
Memory Serialized 1x Replicated
23/12/22 22:11:11 WARN CacheManager: Asked to cache already cached data.


# Write dataframe

In [154]:
(
    df
    .write
    .partitionBy("_1")
    .mode("overwrite")
    .parquet("/home/krebai/tmp")
)

In [160]:
df.write.mode("overwrite").json("/home/krebai/tmp/json")

schema = StructType([
    StructField("_1", LongType(), True),
    StructField("_2", StringType(), True)
])

spark.read.json("/home/krebai/tmp/json", schema=schema).show()

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
+---+---+



# Compute number of business days between 2 dates

In [10]:
df_facts = spark.createDataFrame(
    [('data1', '2023-12-18', '2023-12-24'),
     ('data1', '2022-05-08', '2022-05-21')],
    ['data', 'start_date', 'end_date']
)
df_holidays = spark.createDataFrame([('2022-05-10',)], ['holiday_date'])


In [32]:
(
    df_facts.withColumn("test", sequence(to_date("start_date"), to_date("end_date")))
    .withColumn("business_days", expr("size(filter(test, x -> dayOfWeek(x) != 1 and dayOfWeek(x) != 7))"))
    .drop("test")
    .show()
)

+-----+----------+----------+-------------+
| data|start_date|  end_date|business_days|
+-----+----------+----------+-------------+
|data1|2023-12-18|2023-12-24|            5|
|data1|2022-05-08|2022-05-21|           10|
+-----+----------+----------+-------------+

