## [Vectorized UDFs](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html)

In [1]:
from pyspark.sql import SparkSession
!rm -rf metastore_db/
spark = SparkSession.builder.getOrCreate()

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, count, rand, collect_list, explode, struct, count, lit
from pyspark.sql.functions import pandas_udf, PandasUDFType

## Generating Data

In [3]:
n = 100

df = spark.range(0, n**2).withColumn('id', (col('id') / n).cast('integer')).withColumn('v', rand())
df.cache()
df.show()
df.count()

+---+--------------------+
| id|                   v|
+---+--------------------+
|  0|  0.9334557717171508|
|  0|  0.7182465751045117|
|  0| 0.07687686651730008|
|  0|  0.9171707930355599|
|  0| 0.46214282406485385|
|  0|  0.6749666404300179|
|  0| 0.37233475433767327|
|  0|  0.5402371951860131|
|  0| 0.16372292267436628|
|  0|  0.6140353324931549|
|  0|0.044091439131924814|
|  0|  0.5842768052655765|
|  0| 0.16150741124118562|
|  0|  0.9481448624772765|
|  0|  0.5173938471345927|
|  0|  0.9060691507727601|
|  0|  0.6124029392616874|
|  0|  0.6815583021157648|
|  0| 0.22999142836663256|
|  0| 0.10026431864323271|
+---+--------------------+
only showing top 20 rows



10000

## Plus One

In [4]:
from pyspark.sql.types import DoubleType
DoubleType.typeName()

udf_plus_one = udf(plus_one, DoubleType())

NameError: name 'udf' is not defined

In [None]:
from pyspark.sql.functions import udf

@udf('double')
def plus_one(v):
    return v + 1


%timeit df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

In [None]:
df.withColumn('v', plus_one(df.v)).show(5)

In [None]:
@pandas_udf("double", PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return v + 1

%timeit df.withColumn('v', pandas_plus_one(df.v)).agg(count(col('v'))).show()

## Cumulative Probability

In [None]:
import pandas as pd
from scipy import stats

@udf('double')
def cdf(v):
    return float(stats.norm.cdf(v))

%timeit df.withColumn('cumulative_probability', cdf(df.v)).agg(count(col('cumulative_probability'))).show()

In [None]:
df.withColumn('cumulative_probability', cdf(df.v)).show(5)

In [None]:
import pandas as pd
from scipy import stats

@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_cdf(v):
    return pd.Series(stats.norm.cdf(v))

%timeit df.withColumn('cumulative_probability', pandas_cdf(df.v)).agg(count(col('cumulative_probability'))).show()

## Operation on a Group - Subtract Mean

In [None]:
from pyspark.sql import Row

@udf(ArrayType(df.schema))
def subtract_mean(rows):
    vs = pd.Series([r.v for r in rows])
    vs = vs - vs.mean()
    return [Row(id=rows[i]['id'], v=float(vs[i])) for i in range(len(rows))]
  
%timeit df.groupby('id').agg(collect_list(struct(df['id'], df['v'])).alias('rows')).withColumn('new_rows', subtract_mean(col('rows'))).withColumn('new_row', explode(col('new_rows'))).withColumn('id', col('new_row.id')).withColumn('v', col('new_row.v')).agg(count(col('v'))).show()

In [None]:
df.groupby('id').agg(collect_list(struct(df['id'], df['v'])).alias('rows')).withColumn('new_rows', subtract_mean(col('rows'))).withColumn('new_row', explode(col('new_rows'))).withColumn('id', col('new_row.id')).withColumn('v', col('new_row.v')).show(5)

In [None]:
df.schema

In [None]:
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def pandas_subtract_mean(pdf):
    return pdf.assign(v=pdf.v - pdf.v.mean())

%timeit df.groupby('id').apply(pandas_subtract_mean).agg(count(col('v'))).show()

In [None]:
df.groupby('id').apply(pandas_subtract_mean).show(5)

## OLS

In [None]:
df2 = df.withColumn('y', rand()).withColumn('x1', rand()).withColumn('x2', rand()).select('id', 'y', 'x1', 'x2')
df2.show() 

In [None]:
import pandas as pd
import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df2.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
    X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()
    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

beta = df2.groupby(group_column).apply(ols)
beta.show()

In [None]:
from sklearn.linear_model import LinearRegression

pandas_df = df2.filter('id == 31').toPandas()[['x1', 'x2', 'y']]
model = LinearRegression()
model.fit(X=pandas_df[['x1', 'x2']], y=pandas_df[['y']])
model.coef_

In [None]:
pandas_df