In [2]:
import pandas as pd
from typing import *
import re
# from fpgrowth_py import fpgrowth
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.types import ArrayType, StringType, StructField, StructType
import numpy as np

In [3]:
files = pd.read_csv(r"../Generated_Result/10k.csv", sep=";").fillna("None")
obj = files.iloc[:, :3]
for col in obj.columns:
    obj[col] = obj[col].apply(lambda x: np.nan if "none" in x.lower() else x)


In [4]:
sets = obj.values.tolist()

In [5]:
np.isnan(sets[0][-1])
print(sets[0])

['Geological CO2 sequestration', 'Mitigation of anthropogenic carbon dioxide emissions|injection capacity and safety of the storage system|fluid flow in porous media|etc', nan]


In [6]:
for idx, subset in enumerate(sets):
    subset = [val.split("|") for val in subset if not pd.isnull(val)]
    subset = [item.strip() for yitem in subset for item in yitem]
    sets[idx] = subset

In [7]:
from functools import singledispatch

@singledispatch
def myFunc(marker, val):
    print("This is the default function")

@myFunc.register(float)
def _(marker: float, val: List[str]):
    pattern = r'\b(?:yes|Yes|etc|-?\d+\.\d+|000)\b'
    processed_list = [item for item in val if not re.search(pattern, item)]
    return processed_list

@myFunc.register(int)
def _(marker: int, val: List[str]):
    print(fr"the marker type is {type(marker)}, it will clean the duplicated value.")
    pattern = r'\b(?:yes|Yes|etc|-?\d+\.\d+|000)\b'
    processed_list = [item for item in val if not re.search(pattern, item)]
    return list(set(processed_list))

In [8]:
testing_pattern = r"\b(?:yes|Yes|etc|-?\d+\.\d+|000)\b"
words1, word2, word3, word4 = "here is a pattern owns 1.0 and 2.0", \
                          "here is a pattern owns 1 and 2", \
                          "here is a pattern owns no number",\
                          "000"
for word in [words1, word2, word3, word4]:
    print(re.search(testing_pattern, word))

<re.Match object; span=(23, 26), match='1.0'>
None
None
<re.Match object; span=(0, 3), match='000'>


In [9]:
sets[0]

['Geological CO2 sequestration',
 'Mitigation of anthropogenic carbon dioxide emissions',
 'injection capacity and safety of the storage system',
 'fluid flow in porous media',
 'etc']

In [10]:
# sets_proper = filter(myFunc, sets)
# sets_proper=list(sets_proper)
sets_proper = [myFunc(1.0, val) for val in sets if val ]

### FP Growth for FP-Growth Spark

In [None]:
import findspark
findspark.init()

spark = SparkSession.builder.getOrCreate()
schema = StructType([
    StructField("items", ArrayType(StringType()), nullable=True)
])
A = [
    ["r", "z", "h", "k", "p"],
    ["z", "y", "x", "w", "v"],
    ["s", "x", "o", "n", "r"],
    ["x", "z", "y", "m", "t"],
    ["z"],
    ["x", "z", "y", "r", "q"]
]
rdd = spark.sparkContext.parallelize(sets_proper)
df = spark.createDataFrame(rdd.map(lambda x: (x,)), schema)

In [12]:
df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|items                                                                                                                                                                                                                                                                                                                                                                                |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
from pyspark.ml.fpm import FPGrowth
fp = FPGrowth(itemsCol="items", minSupport=0.000107, minConfidence=0.1)

In [14]:
fpm=fp.fit(df)

In [15]:
fpm.setPredictionCol("newPrediction")
# fpm.freqItemsets.sort("items").show(5)

FPGrowthModel: uid=FPGrowth_7868bf101870, numTrainingRecords=8802

In [16]:
import pyspark.sql.functions as F
fp_association = fpm.associationRules.withColumn('antecedent', F.concat_ws('-', F.col("antecedent").cast("array<string>")))\
                         .withColumn('consequent', F.concat_ws('-', F.col("consequent").cast("array<string>")))
new_data = spark.createDataFrame([(["t", "s"], )], ["items"])

In [None]:
fp_association.write.csv(r"../FP_Growth_Inference/fpres.csv", mode = "overwrite", header=True)

In [None]:
print("""
"An error occurred while calling o164.csv.\n: java.lang.NoSuchMethodError: org.apache.spark.SparkThrowableHelper$.getMessage(Ljava/lang/Throwable;)Ljava/lang/String;\r\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:130)\r\n\tat scala.Option.map(Option.scala:230)\r\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)\r\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)\r\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)\r\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)\r\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)\r\n\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)\r\n\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)\r\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)\r\n\tat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)\r\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)\r\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)\r\n\tat org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)\r\n\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)\r\n\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)\r\n\tat org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)\r\n\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)\r\n\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)\r\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)\r\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)\r\n\tat org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:847)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)\r\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)\r\n\tat java.lang.reflect.Method.invoke(Unknown Source)\r\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\r\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\r\n\tat py4j.Gateway.invoke(Gateway.java:282)\r\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\r\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\r\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\r\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\r\n\tat java.lang.Thread.run(Unknown Source)\r\n",""")

### FP Growth for FP-Growth mlxtend

In [11]:
from mlxtend.frequent_patterns import fpgrowth
from mlxtend.preprocessing import TransactionEncoder
import pandas as pd

In [12]:
sum([len(val) for val in sets_proper])

41790

In [13]:
te=TransactionEncoder()
te_ary=te.fit(sets_proper).transform(sets_proper)
df = pd.DataFrame(te_ary, columns=te.columns_)

In [14]:
df.head()

Unnamed: 0,Unnamed: 1,0,0 inches (minimum ZOI) - 9 inches (maximum ZOI,001 New Zealand houses,1,1) a subculture of permissiveness toward drinking and driving for men,1) enhanced flexibility in design,1) evaluates the impact of specific categories of residual risks (actor,1) for capturing knowledge about users,1-10-AgNPs,...,younger patients,younger riders (in particular undergraduate students,youths,zebrafish,zero-sum stochastic differential game,zinc,zinc (Zn,zinc sulfate,zone ventilation,zooplankton (with either low or high concentrations of MeHg
0,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
1,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
2,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
3,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
4,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False


In [None]:
for column in df.columns[:100]:
    true_indices = df.index[df[column] == True].tolist()
    print(f"True indices in column '{column}': {len(true_indices) if len(true_indices) > 5 else true_indices}")
    del true_indices

In [None]:
pd.DataFrame(counter)

In [15]:
fq_set = fpgrowth(df, min_support=0.0002, use_colnames=True)
fq_set.shape

(4092, 2)

In [21]:
fq_set

Unnamed: 0,support,itemsets
50,0.000227,(driving)
51,0.000227,(Texas Department of Transportation)
52,0.004544,(Researchers)
53,0.000454,(and investors)
54,0.000227,(and waste management)
55,0.004885,(regulatory agencies)
56,0.002954,(operators)
57,0.020791,(policymakers)
58,0.000454,(poverty)
59,0.000341,(Scientific communities)
