In [1]:
import h2o
import numpy as np
from pysparkling.ml import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import posexplode, monotonically_increasing_id, explode
import pyspark.sql.functions as F
import pandas as pd
from pyspark.mllib.linalg import DenseMatrix, Matrices
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
from pyspark.ml.feature import VectorAssembler
pd.set_option("max.columns", None)

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
# help(H2OMOJOSettings)

In [5]:
model = H2OMOJOModel.createFromMojo("GLRM_model_python_1614873327762_1.zip")

In [6]:
spark_df = spark.read.csv("ACS_13_5YR_DP02_cleaned.csv", header = True, inferSchema = True)
spark_df = spark_df.drop("ZCTA5")

for col_name in spark_df.columns:
    spark_df = spark_df.withColumn(col_name, F.col(col_name).cast('float'))

spark_df = spark_df.withColumn("columnindex", monotonically_increasing_id())

In [7]:
print("Initial Partitions:" + str(spark_df.rdd.getNumPartitions()))

Initial Partitions:4


# Predictions without Coalesce

In [8]:
preds = model.transform(spark_df)

In [10]:
preds.select("total_households", "family_households", "families_w_children_under_18","prediction").limit(1).toPandas()

Unnamed: 0,total_households,family_households,families_w_children_under_18,prediction
0,7056.0,4085.0,1678.0,"([-1.0155266326488948, 0.037445892075809166, 0..."


In [12]:
prediction = preds.select(["columnindex","prediction.*"])
prediction.select("columnindex",F.explode(F.col("dimensions"))).show(10)

+-----------+--------------------+
|columnindex|                 col|
+-----------+--------------------+
|          0| -1.0155266326488948|
|          0|0.037445892075809166|
|          0| 0.10817576042121546|
|          0| -0.2908371756267977|
|          0| 0.03729210222908052|
|          0|-0.43742208666031224|
|          0| -0.8294971394172237|
|          0|-0.11786640997612814|
|          0|-0.07054274199263487|
|          0| 0.14194849295052916|
+-----------+--------------------+
only showing top 10 rows



In [13]:
prediction.select("columnindex",
                  F.explode(F.col("dimensions"))).where(F.col("columnindex")==25769805880).show()

+-----------+--------------------+
|columnindex|                 col|
+-----------+--------------------+
|25769805880| -0.7630376934081667|
|25769805880| 0.04222480619531732|
|25769805880| 0.10292224631855686|
|25769805880|-0.22639464082485053|
|25769805880|-0.04749883767023598|
|25769805880| -0.5637302680349653|
|25769805880| -1.0651128528810487|
|25769805880|-0.08372312815885755|
|25769805880|-0.21484516371447193|
|25769805880| 0.08854260735773849|
+-----------+--------------------+



In [14]:
for i in range(0,10):
    prediction = prediction.withColumn("dim_{}".format(i),prediction.dimensions[i])

prediction = prediction.select(["columnindex", "dim_0", "dim_1", "dim_2", "dim_3", "dim_4", 
                                "dim_5", "dim_6", "dim_7", "dim_8", "dim_9"])
prediction.limit(5).toPandas()

Unnamed: 0,columnindex,dim_0,dim_1,dim_2,dim_3,dim_4,dim_5,dim_6,dim_7,dim_8,dim_9
0,0,-1.015527,0.037446,0.108176,-0.290837,0.037292,-0.437422,-0.829497,-0.117866,-0.070543,0.141948
1,1,-1.904077,0.071128,0.174555,-0.974073,0.181103,-0.77022,-1.145675,-0.126716,-0.050403,0.231671
2,2,-0.393558,-0.008361,-0.023525,-0.211169,0.062322,-0.513738,0.258782,0.048655,-0.017303,1.159121
3,3,-0.6051,0.001487,-0.000136,0.458181,-0.077596,-0.616251,0.031854,-0.129628,0.090816,0.61108
4,4,-0.951038,0.000339,0.008169,-0.007327,0.000778,-0.516757,0.522114,-0.031599,0.009877,0.855227


In [15]:
prediction.where((F.col("columnindex")==25769805879) | (F.col("columnindex")==25769805880)).toPandas()

Unnamed: 0,columnindex,dim_0,dim_1,dim_2,dim_3,dim_4,dim_5,dim_6,dim_7,dim_8,dim_9
0,25769805879,-0.786698,0.044635,0.104801,-0.261875,-0.049682,-0.588091,-0.968061,-0.084342,-0.223354,0.047074
1,25769805880,-1.828583,0.088975,0.197922,-1.028102,0.013583,-0.982057,-1.147307,0.047502,-0.603387,0.434931


In [16]:
prediction.toPandas().to_csv("mojo_predictions_wo_coalesce.csv", index = False)

# Coalescing Partitions before Model Prediction

In [17]:
spark_df = spark_df.coalesce(1)
print("Number Partitions:" + str(spark_df.rdd.getNumPartitions()))

Number Partitions:1


In [18]:
preds = model.transform(spark_df)
preds.select("total_households", "family_households", "families_w_children_under_18","prediction").limit(1).toPandas()

Unnamed: 0,total_households,family_households,families_w_children_under_18,prediction
0,7056.0,4085.0,1678.0,"([-1.0155266326488948, 0.037445892075809166, 0..."


In [19]:
prediction = preds.select(["columnindex","prediction.*"])
prediction.select("columnindex",F.explode(F.col("dimensions"))).show(10)

+-----------+--------------------+
|columnindex|                 col|
+-----------+--------------------+
|          0| -1.0155266326488948|
|          0|0.037445892075809166|
|          0| 0.10817576042121546|
|          0| -0.2908371756267977|
|          0| 0.03729210222908052|
|          0|-0.43742208666031224|
|          0| -0.8294971394172237|
|          0|-0.11786640997612814|
|          0|-0.07054274199263487|
|          0| 0.14194849295052916|
+-----------+--------------------+
only showing top 10 rows



In [20]:
prediction.select("columnindex",
                  F.explode(F.col("dimensions"))).where(F.col("columnindex")==25769805880).show()

+-----------+--------------------+
|columnindex|                 col|
+-----------+--------------------+
|25769805880| -0.7630376934081667|
|25769805880| 0.04222480619531732|
|25769805880| 0.10292224631855686|
|25769805880|-0.22639464082485053|
|25769805880|-0.04749883767023598|
|25769805880| -0.5637302680349653|
|25769805880| -1.0651128528810487|
|25769805880|-0.08372312815885755|
|25769805880|-0.21484516371447193|
|25769805880| 0.08854260735773849|
+-----------+--------------------+



In [21]:
for i in range(0,10):
    prediction = prediction.withColumn("dim_{}".format(i),prediction.dimensions[i])

prediction = prediction.select(["columnindex", "dim_0", "dim_1", "dim_2", "dim_3", "dim_4", 
                                "dim_5", "dim_6", "dim_7", "dim_8", "dim_9"])
prediction.limit(5).toPandas()

Unnamed: 0,columnindex,dim_0,dim_1,dim_2,dim_3,dim_4,dim_5,dim_6,dim_7,dim_8,dim_9
0,0,-1.015527,0.037446,0.108176,-0.290837,0.037292,-0.437422,-0.829497,-0.117866,-0.070543,0.141948
1,1,-1.904077,0.071128,0.174555,-0.974073,0.181103,-0.77022,-1.145675,-0.126716,-0.050403,0.231671
2,2,-0.393558,-0.008361,-0.023525,-0.211169,0.062322,-0.513738,0.258782,0.048655,-0.017303,1.159121
3,3,-0.6051,0.001487,-0.000136,0.458181,-0.077596,-0.616251,0.031854,-0.129628,0.090816,0.61108
4,4,-0.951038,0.000339,0.008169,-0.007327,0.000778,-0.516757,0.522114,-0.031599,0.009877,0.855227


In [22]:
prediction.where((F.col("columnindex")==25769805879) | (F.col("columnindex")==25769805880)).toPandas()

Unnamed: 0,columnindex,dim_0,dim_1,dim_2,dim_3,dim_4,dim_5,dim_6,dim_7,dim_8,dim_9
0,25769805879,-0.786698,0.044635,0.104801,-0.261875,-0.049682,-0.588091,-0.968061,-0.084342,-0.223354,0.047074
1,25769805880,-1.828583,0.088975,0.197922,-1.028102,0.013583,-0.982057,-1.147307,0.047502,-0.603387,0.434931


In [23]:
prediction.toPandas().to_csv("mojo_predictions_w_coalesce.csv", index = False)