In [2]:
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType, StructField, StructType
import pyspark.sql.functions as F
from pyspark.sql import Window

# Pandas DataFrame

In [2]:
df = pd.DataFrame({'identifiers':[1, 1, 1, 2, 2, 3, 3, 3, 3], 'values':[2, 5, 2, 3, 5, 8, 1, 5, 9]})
df

Unnamed: 0,identifiers,values
0,1,2
1,1,5
2,1,2
3,2,3
4,2,5
5,3,8
6,3,1
7,3,5
8,3,9


In [3]:
# Apply some aggregation function
df_agg = df.groupby(['identifiers'], as_index=False).mean()
df_agg

Unnamed: 0,identifiers,values
0,1,3.0
1,2,4.0
2,3,5.75


In [4]:
# Join the aggregated back to the original dataframe: broadcasting!
df_combined = df.merge(df_agg, on='identifiers', how='left', suffixes=['', '_mean'])
df_combined

Unnamed: 0,identifiers,values,values_mean
0,1,2,3.0
1,1,5,3.0
2,1,2,3.0
3,2,3,4.0
4,2,5,4.0
5,3,8,5.75
6,3,1,5.75
7,3,5,5.75
8,3,9,5.75


In [5]:
# remove mean from the values based on grouping
df_results = df_combined.copy()
df_results['values'] = df_results['values'] - df_results['values_mean']
df_results

Unnamed: 0,identifiers,values,values_mean
0,1,-1.0,3.0
1,1,2.0,3.0
2,1,-1.0,3.0
3,2,-1.0,4.0
4,2,1.0,4.0
5,3,2.25,5.75
6,3,-4.75,5.75
7,3,-0.75,5.75
8,3,3.25,5.75


# Spark DataFrame: with Left Join

In [7]:
df = pd.DataFrame({'identifiers':[1, 1, 1, 2, 2, 3, 3, 3, 3], 'values':[2, 5, 2, 3, 5, 8, 1, 5, 9]})
mySchema = StructType([StructField('identifiers', IntegerType(), True),
                       StructField('values', IntegerType(), True)])
spark = SparkSession.builder \
    .master("local") \
    .appName("Example Spark DataFrame Broadcasting Operation") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
df = spark.createDataFrame(df, mySchema)
df.toPandas()

Unnamed: 0,identifiers,values
0,1,2
1,1,5
2,1,2
3,2,3
4,2,5
5,3,8
6,3,1
7,3,5
8,3,9


In [8]:
# Apply aggregation function
df_agg = df.groupby('identifiers').agg(F.mean('values').alias('values_mean'))
df_agg.toPandas()

Unnamed: 0,identifiers,values_mean
0,1,3.0
1,3,5.75
2,2,4.0


In [9]:
# Join the aggregated back to the original dataframe: broadcasting!
df_combined = df.join(df_agg, on=['identifiers'], how='left')
df_combined.toPandas()

Unnamed: 0,identifiers,values,values_mean
0,1,2,3.0
1,1,5,3.0
2,1,2,3.0
3,3,8,5.75
4,3,1,5.75
5,3,5,5.75
6,3,9,5.75
7,2,3,4.0
8,2,5,4.0


In [10]:
# remove mean from the values based on grouping
df_result = df_combined.withColumn("values", F.col("values")- F.col("values_mean"))
df_result.toPandas()

Unnamed: 0,identifiers,values,values_mean
0,1,-1.0,3.0
1,1,2.0,3.0
2,1,-1.0,3.0
3,3,2.25,5.75
4,3,-4.75,5.75
5,3,-0.75,5.75
6,3,3.25,5.75
7,2,-1.0,4.0
8,2,1.0,4.0


# Spark DataFrame: Window function way

In [3]:
df = pd.DataFrame({'identifiers':[1, 1, 1, 2, 2, 3, 3, 3, 3], 'values':[2, 5, 2, 3, 5, 8, 1, 5, 9]})
mySchema = StructType([StructField('identifiers', IntegerType(), True),
                       StructField('values', IntegerType(), True)])
spark = SparkSession.builder \
    .master("local") \
    .appName("Example Spark DataFrame Broadcasting Operation") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
df = spark.createDataFrame(df, mySchema)
df.toPandas()

Unnamed: 0,identifiers,values
0,1,2
1,1,5
2,1,2
3,2,3
4,2,5
5,3,8
6,3,1
7,3,5
8,3,9


In [4]:
# Window function way of calculating means without the join
w = Window.partitionBy("identifiers")

df = df.withColumn("values_mean", F.mean("values").over(w))
df.toPandas()

Unnamed: 0,identifiers,values,values_mean
0,1,2,3.0
1,1,5,3.0
2,1,2,3.0
3,3,8,5.75
4,3,1,5.75
5,3,5,5.75
6,3,9,5.75
7,2,3,4.0
8,2,5,4.0


In [5]:
# remove mean from the values based on grouping
df_result = df.withColumn("values", F.col("values")- F.col("values_mean"))
df_result.toPandas()

Unnamed: 0,identifiers,values,values_mean
0,1,-1.0,3.0
1,1,2.0,3.0
2,1,-1.0,3.0
3,3,2.25,5.75
4,3,-4.75,5.75
5,3,-0.75,5.75
6,3,3.25,5.75
7,2,-1.0,4.0
8,2,1.0,4.0


# Spark DataFrame: Window Function Way in one step

In [6]:
df = pd.DataFrame({'identifiers':[1, 1, 1, 2, 2, 3, 3, 3, 3], 'values':[2, 5, 2, 3, 5, 8, 1, 5, 9]})
mySchema = StructType([StructField('identifiers', IntegerType(), True),
                       StructField('values', IntegerType(), True)])
spark = SparkSession.builder \
    .master("local") \
    .appName("Example Spark DataFrame Broadcasting Operation") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
df = spark.createDataFrame(df, mySchema)
df.toPandas()

Unnamed: 0,identifiers,values
0,1,2
1,1,5
2,1,2
3,2,3
4,2,5
5,3,8
6,3,1
7,3,5
8,3,9


In [9]:
w = Window.partitionBy("identifiers")
df_result = df.withColumn("mean_removed_values", F.col("values") - F.mean("values").over(w))
df_result.toPandas()

Unnamed: 0,identifiers,values,mean_removed_values
0,1,2,-1.0
1,1,5,2.0
2,1,2,-1.0
3,3,8,2.25
4,3,1,-4.75
5,3,5,-0.75
6,3,9,3.25
7,2,3,-1.0
8,2,5,1.0
