In [6]:
#Set up environment
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F 
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.session import SparkSession
import json
#from pyspark.sql.functions import col, udf, array, collect_list, col

# instantiate Spark
spark = SparkSession.builder.getOrCreate()

In [None]:
#When working outside of GCP

#sc_conf = SparkConf()
#sc_conf.setAppName("testApp") #// your spark application name
#sc_conf.setMaster('local[*]') #// specify master to run on local mode
#sc = pyspark.SparkContext(conf=sc_conf)
#sc = SparkContext.getOrCreate(conf=conf)
#sc = SparkContext.getOrCreate();

In [23]:
#bigquery conf
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input_v8'.format(bucket)

conf = {
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'sauron-230322',
    'mapred.bq.input.dataset.id': 'oculi_30602',
    'mapred.bq.input.table.id': 'Spark_Label_Modeling_V3'}

In [None]:
#Loading Data from BQ table
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

In [26]:
#Providing SQL context for a flat table with headers/converting
sql_context = SQLContext(sc)
wordT = table_data.map(lambda x : x[1])
tableDf=sql_context.read.json(wordT)

In [27]:
tableDf.show()

+-----------+-------------+
|creative_id|       labels|
+-----------+-------------+
|   57587813|         Blue|
|   57587813|        Azure|
|   57587813|Electric blue|
|   57587813|      Company|
|   57834875|       Purple|
|   58335520|         Blue|
|   58335520|        Azure|
|   58385205|Electric blue|
|   58481230|         Blue|
|   58481230|        Azure|
|   58481230|Electric blue|
|   58481331|  Cobalt blue|
|   58481331|         Blue|
|   58481331|          Sky|
|   58481331|      Daytime|
|   58481331|         Aqua|
|   58481331|        Green|
|   58481331|Electric blue|
|   58481331|        Black|
|   58481331|   Atmosphere|
+-----------+-------------+
only showing top 20 rows



In [28]:
# Create dataframe for FPGrowth model input
labelsets = tableDf.groupBy("creative_id")\
      .agg(F.collect_set("labels"))

labelsets.show()

+-----------+--------------------+
|creative_id| collect_set(labels)|
+-----------+--------------------+
|  100078767|[Sport utility ve...|
|  100333935|     [Electric blue]|
|  100364801|[Kia motors, Mode...|
|  100380453|            [Yellow]|
|  100421201|[Automotive exter...|
|  100576531|[Sport utility ve...|
|  100664780| [Trademark, Yellow]|
|  100728693|[Mid-size car, Ki...|
|  100754996|[Cat, Whiskers, P...|
|  100853868|    [Graphic design]|
|  100918417|[Sport utility ve...|
|  100971229|[Sky, Real estate...|
|  100997898|[Mazda, Product, ...|
|  101076007|[Blue, Aqua, Turq...|
|  101248228|     [T-shirt, Skin]|
|  101416455|[Product, Gmc env...|
|  101453206|[Purple, Pink, Re...|
|  101463612|[Engineering, Water]|
|  101507357|[Family car, Spor...|
|  101550788|[Shoe, Product, C...|
+-----------+--------------------+
only showing top 20 rows



In [38]:
fpGrowth = FPGrowth(itemsCol="collect_set(labels)", minSupport=0.01, minConfidence=0.5)

In [39]:
model = fpGrowth.fit(labelsets)

In [40]:
# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(labelsets).show()

+--------------------+-----+
|               items| freq|
+--------------------+-----+
|     [Motor vehicle]|18845|
|      [Land vehicle]|18605|
|[Land vehicle, Mo...|10556|
| [Automotive design]|17512|
|[Automotive desig...|12070|
|[Automotive desig...| 7354|
|[Automotive desig...|11869|
|           [Product]|14782|
|[Product, Automot...| 3542|
|[Product, Automot...| 1979|
|[Product, Automot...| 1332|
|[Product, Automot...| 2626|
|[Product, Land ve...| 3194|
|[Product, Land ve...| 2107|
|[Product, Motor v...| 4254|
|[Automotive exter...|14619|
|[Automotive exter...| 8565|
|[Automotive exter...| 4712|
|[Automotive exter...| 3211|
|[Automotive exter...| 6552|
+--------------------+-----+
only showing top 20 rows

+--------------------+--------------------+------------------+
|          antecedent|          consequent|        confidence|
+--------------------+--------------------+------------------+
|             [Table]|         [Furniture]|0.9377027903958468|
|             [Table]|    