In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

schema = T.StructType([
    T.StructField("dest", T.StringType()),
    T.StructField("origin", T.StringType()),
    T.StructField("flights", T.IntegerType())
])

df = (spark.read.option("header", True)
            .schema(schema)
            .csv("/FileStore/tables/flights2010/2010_summary-1.csv"))

display(df.limit(10))

dest,origin,flights
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
df = df.withColumn(
    "origin",
    F.when((F.col("dest").isin(["Egypt", "United States"])) & (F.col("flights") < 26) , F.lit(None)).otherwise(F.col("origin"))
)

In [0]:
display(df.limit(10))

dest,origin,flights
United States,,1
United States,Ireland,264
United States,India,69
Egypt,,24
Equatorial Guinea,United States,1
United States,,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


### Dropping NaNs

---

Two methods (Actually both of these methods are each others alliases)

In [0]:
display(df.na.drop().limit(10))

dest,origin,flights
United States,Ireland,264
United States,India,69
Equatorial Guinea,United States,1
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44
Guyana,United States,17
United States,Sint Maarten,53
Malta,United States,1


In [0]:
display(df.dropna().limit(10))

dest,origin,flights
United States,Ireland,264
United States,India,69
Equatorial Guinea,United States,1
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44
Guyana,United States,17
United States,Sint Maarten,53
Malta,United States,1


In [0]:
display(df.fillna({"origin": "AFRICAN_CAVE"}).limit(20))

dest,origin,flights
United States,AFRICAN_CAVE,1
United States,Ireland,264
United States,India,69
Egypt,AFRICAN_CAVE,24
Equatorial Guinea,United States,1
United States,AFRICAN_CAVE,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
display(df.na.replace(["Ireland"], None).limit(10))

dest,origin,flights
United States,,1
United States,,264
United States,India,69
Egypt,,24
Equatorial Guinea,United States,1
United States,,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
display(df.replace(["Ireland", "Grenada"], "CAVE!").limit(10))

dest,origin,flights
United States,,1
United States,CAVE!,264
United States,India,69
Egypt,,24
Equatorial Guinea,United States,1
United States,,25
United States,CAVE!,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
df_t = df.withColumn(
    "random_id",
    F.array(F.rand(), F.rand())
)

display(df_t.limit(10))

dest,origin,flights,random_id
United States,,1,"List(0.6060300133382945, 0.28989790740641797)"
United States,Ireland,264,"List(0.9092719009982054, 0.4661610086110489)"
United States,India,69,"List(0.7780189682258168, 0.598101903424138)"
Egypt,,24,"List(0.5450045240189886, 0.11557603423842067)"
Equatorial Guinea,United States,1,"List(0.8875993990463372, 0.8277320308183174)"
United States,,25,"List(0.1627235275647222, 0.40416230794840446)"
United States,Grenada,54,"List(0.14363213704967703, 0.7271467337115114)"
Costa Rica,United States,477,"List(0.08515340990090958, 0.5028811135495342)"
Senegal,United States,29,"List(0.13781881655053707, 0.658067304302552)"
United States,Marshall Islands,44,"List(0.9505931820043269, 0.004191571160683583)"


In [0]:
display(df_t.select("*", F.explode(F.col("random_id").alias("rid"))).drop("random_id").limit(10))

dest,origin,flights,col
United States,,1,0.6060300133382945
United States,,1,0.2898979074064179
United States,Ireland,264,0.9092719009982054
United States,Ireland,264,0.4661610086110489
United States,India,69,0.7780189682258168
United States,India,69,0.598101903424138
Egypt,,24,0.5450045240189886
Egypt,,24,0.1155760342384206
Equatorial Guinea,United States,1,0.8875993990463372
Equatorial Guinea,United States,1,0.8277320308183174


In [0]:
df = df.withColumn(
    "dest_no_vowel",
    F.regexp_replace("dest", r"[aouiey]", "")
)

display(df.limit(10))

dest,origin,flights,dest_no_vowel
United States,,1,Untd Stts
United States,Ireland,264,Untd Stts
United States,India,69,Untd Stts
Egypt,,24,Egpt
Equatorial Guinea,United States,1,Eqtrl Gn
United States,,25,Untd Stts
United States,Grenada,54,Untd Stts
Costa Rica,United States,477,Cst Rc
Senegal,United States,29,Sngl
United States,Marshall Islands,44,Untd Stts


In [0]:
df = df.withColumn(
    "dest_last2_first2",
    F.regexp_extract("dest", r".{2}\ .{2}", 0)
)

display(df.limit(10))

dest,origin,flights,dest_no_vowel,dest_last2_first2
United States,,1,Untd Stts,ed St
United States,Ireland,264,Untd Stts,ed St
United States,India,69,Untd Stts,ed St
Egypt,,24,Egpt,
Equatorial Guinea,United States,1,Eqtrl Gn,al Gu
United States,,25,Untd Stts,ed St
United States,Grenada,54,Untd Stts,ed St
Costa Rica,United States,477,Cst Rc,ta Ri
Senegal,United States,29,Sngl,
United States,Marshall Islands,44,Untd Stts,ed St


In [0]:
df = df.withColumn(
    "temp_origin",
    F.ifnull(F.col("origin"), F.lit("AFRICAN_CAVE"))
)

display(df.limit(10))

dest,origin,flights,dest_no_vowel,dest_last2_first2,temp_origin
United States,,1,Untd Stts,ed St,AFRICAN_CAVE
United States,Ireland,264,Untd Stts,ed St,Ireland
United States,India,69,Untd Stts,ed St,India
Egypt,,24,Egpt,,AFRICAN_CAVE
Equatorial Guinea,United States,1,Eqtrl Gn,al Gu,United States
United States,,25,Untd Stts,ed St,AFRICAN_CAVE
United States,Grenada,54,Untd Stts,ed St,Grenada
Costa Rica,United States,477,Cst Rc,ta Ri,United States
Senegal,United States,29,Sngl,,United States
United States,Marshall Islands,44,Untd Stts,ed St,Marshall Islands


In [0]:
display(df.withColumn("dest", F.replace("dest", F.lit("ed"), F.lit("y"))).limit(10))

dest,origin,flights,dest_no_vowel,dest_last2_first2,temp_origin
Unity States,,1,Untd Stts,ed St,AFRICAN_CAVE
Unity States,Ireland,264,Untd Stts,ed St,Ireland
Unity States,India,69,Untd Stts,ed St,India
Egypt,,24,Egpt,,AFRICAN_CAVE
Equatorial Guinea,United States,1,Eqtrl Gn,al Gu,United States
Unity States,,25,Untd Stts,ed St,AFRICAN_CAVE
Unity States,Grenada,54,Untd Stts,ed St,Grenada
Costa Rica,United States,477,Cst Rc,ta Ri,United States
Senegal,United States,29,Sngl,,United States
Unity States,Marshall Islands,44,Untd Stts,ed St,Marshall Islands


### Aggregate functions

---

In [0]:
# simple groupby

display(df.groupby(F.col("origin"))
          .agg(F.sum("flights").alias("flights_sum"))
          .sort(F.col("flights_sum").desc())
          .limit(10))

origin,flights_sum
United States,385426
Canada,8305
Mexico,6220
United Kingdom,1503
Germany,1406
Japan,1307
Dominican Republic,1150
The Bahamas,959
Colombia,832
France,776


In [0]:
display(df.groupby("origin")
          .agg(F.skewness("flights").alias("flights_skewness"))
          .sort(F.col("flights_skewness").desc())
          .limit(10))

origin,flights_skewness
United States,10.985101177961166
,0.1738389115903938
Russia,
Senegal,
Sweden,
Philippines,
Fiji,
Turkey,
Germany,
Jordan,


In [0]:
display(df.groupby(F.substring("origin", 1, 1).alias("origin_first_letter"))
  .agg(F.count("flights").alias("count"))
  .sort(F.col("count").desc())
  .limit(10))

origin_first_letter,count
U,126
,51
S,11
B,7
C,7
P,6
G,6
T,5
I,5
A,5


In [0]:
def get_arrays(val):
    return list(range(0, val, val//10 + 1))

array_udf = F.udf(get_arrays, T.ArrayType(T.IntegerType()))

In [0]:
df = df.withColumn(
    "crazy_array",
    array_udf("flights")
)

display(df.limit(10))

dest,origin,flights,dest_no_vowel,dest_last2_first2,temp_origin,crazy_array
United States,,1,Untd Stts,ed St,AFRICAN_CAVE,List(0)
United States,Ireland,264,Untd Stts,ed St,Ireland,"List(0, 27, 54, 81, 108, 135, 162, 189, 216, 243)"
United States,India,69,Untd Stts,ed St,India,"List(0, 7, 14, 21, 28, 35, 42, 49, 56, 63)"
Egypt,,24,Egpt,,AFRICAN_CAVE,"List(0, 3, 6, 9, 12, 15, 18, 21)"
Equatorial Guinea,United States,1,Eqtrl Gn,al Gu,United States,List(0)
United States,,25,Untd Stts,ed St,AFRICAN_CAVE,"List(0, 3, 6, 9, 12, 15, 18, 21, 24)"
United States,Grenada,54,Untd Stts,ed St,Grenada,"List(0, 6, 12, 18, 24, 30, 36, 42, 48)"
Costa Rica,United States,477,Cst Rc,ta Ri,United States,"List(0, 48, 96, 144, 192, 240, 288, 336, 384, 432)"
Senegal,United States,29,Sngl,,United States,"List(0, 3, 6, 9, 12, 15, 18, 21, 24, 27)"
United States,Marshall Islands,44,Untd Stts,ed St,Marshall Islands,"List(0, 5, 10, 15, 20, 25, 30, 35, 40)"


IPv4Address('0.0.5.251')

In [0]:
import pandas as pd
import ipaddress as ipa

@F.pandas_udf(T.StringType())
def get_ip(series: pd.Series) -> pd.Series:
    return series.apply(lambda x: str(ipa.IPv4Address(x)))

In [0]:
df = df.withColumn(
    "crazy_column_pd",
    get_ip(F.col("flights"))
)

display(df.limit(10))

dest,origin,flights,dest_no_vowel,dest_last2_first2,temp_origin,crazy_array,crazy_column_pd
United States,,1,Untd Stts,ed St,AFRICAN_CAVE,List(0),0.0.0.1
United States,Ireland,264,Untd Stts,ed St,Ireland,"List(0, 27, 54, 81, 108, 135, 162, 189, 216, 243)",0.0.1.8
United States,India,69,Untd Stts,ed St,India,"List(0, 7, 14, 21, 28, 35, 42, 49, 56, 63)",0.0.0.69
Egypt,,24,Egpt,,AFRICAN_CAVE,"List(0, 3, 6, 9, 12, 15, 18, 21)",0.0.0.24
Equatorial Guinea,United States,1,Eqtrl Gn,al Gu,United States,List(0),0.0.0.1
United States,,25,Untd Stts,ed St,AFRICAN_CAVE,"List(0, 3, 6, 9, 12, 15, 18, 21, 24)",0.0.0.25
United States,Grenada,54,Untd Stts,ed St,Grenada,"List(0, 6, 12, 18, 24, 30, 36, 42, 48)",0.0.0.54
Costa Rica,United States,477,Cst Rc,ta Ri,United States,"List(0, 48, 96, 144, 192, 240, 288, 336, 384, 432)",0.0.1.221
Senegal,United States,29,Sngl,,United States,"List(0, 3, 6, 9, 12, 15, 18, 21, 24, 27)",0.0.0.29
United States,Marshall Islands,44,Untd Stts,ed St,Marshall Islands,"List(0, 5, 10, 15, 20, 25, 30, 35, 40)",0.0.0.44
