<a href="https://colab.research.google.com/github/sainey-manga/AMD-project/blob/main/AMD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 63.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=bd2b3b5fa2536ce1812efbdbf94fca8144d623f6562335080e7da08349e486eb
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [4]:
from google.colab import files
json_file = files.upload()
!mkdir /root/.kaggle
!mv kaggle.json /root/.kaggle/kaggle.json
!chmod 600 /root/.kaggle/kaggle.json

Saving kaggle.json to kaggle (1).json
mkdir: cannot create directory ‘/root/.kaggle’: File exists


In [5]:
import kaggle
from kaggle.api.kaggle_api_extended import KaggleApi

In [8]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [9]:
conf = SparkConf().setAppName("MarketBasketAnalysis")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '8G')
        .set('spark.driver.memory', '45G')
        .set('spark.driver.maxResultSize', '0')
        .set("spark.sql.execution.arrow.enabled", "true"))
sc = SparkContext.getOrCreate(conf=conf)
spark = SQLContext(sc)



In [13]:
!kaggle datasets download -d stackoverflow/stacksample -f Questions.csv --unzip -o
!unzip Questions.csv.zip

Downloading Questions.csv.zip to /content
 99% 582M/589M [00:03<00:00, 198MB/s]
100% 589M/589M [00:03<00:00, 188MB/s]
Archive:  Questions.csv.zip
  inflating: Questions.csv           


In [14]:
df = spark.read\
  .option("multiLine", "true")\
  .option("header", "true")\
  .option("escape", "\"")\
  .csv("Questions.csv")\
  .select("Id","Body")

In [15]:
df.show(10)

+---+--------------------+
| Id|                Body|
+---+--------------------+
| 80|<p>I've written a...|
| 90|<p>Are there any ...|
|120|<p>Has anyone got...|
|180|<p>This is someth...|
|260|<p>I have a littl...|
|330|<p>I am working o...|
|470|<p>I've been writ...|
|580|<p>I wonder how y...|
|650|<p>I would like t...|
|810|<p>I'm trying to ...|
+---+--------------------+
only showing top 10 rows



In [16]:
df.count()

1264216

# Data Cleaning

The function clean_text is set up to revove unwanted characters from the the corpus

In [17]:
from pyspark.sql.functions import col, lower, regexp_replace, split

In [18]:
def clean_text(t):
    t = lower(t)
    t = regexp_replace(t, "^rt ", "")
    t = regexp_replace(t, "<p>", "")
    t = regexp_replace(t, "</p>", "")
    t = regexp_replace(t, "(https?\://)\S+", "")
    t = regexp_replace(t, "[^a-zA-Z0-9\\s]", "")
    return t

clean_df = df.select('Id',clean_text(col("Body")).alias("Body"))

In [19]:
clean_df.show(10)

+---+--------------------+
| Id|                Body|
+---+--------------------+
| 80|ive written a dat...|
| 90|are there any rea...|
|120|has anyone got ex...|
|180|this is something...|
|260|i have a little g...|
|330|i am working on a...|
|470|ive been writing ...|
|580|i wonder how you ...|
|650|i would like the ...|
|810|im trying to main...|
+---+--------------------+
only showing top 10 rows



In [20]:
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover

In [21]:
tokenizer = RegexTokenizer(inputCol="Body", outputCol="tokens")
tokenize_df = tokenizer.transform(clean_df)
tokenize_df.show(10)

+---+--------------------+--------------------+
| Id|                Body|              tokens|
+---+--------------------+--------------------+
| 80|ive written a dat...|[ive, written, a,...|
| 90|are there any rea...|[are, there, any,...|
|120|has anyone got ex...|[has, anyone, got...|
|180|this is something...|[this, is, someth...|
|260|i have a little g...|[i, have, a, litt...|
|330|i am working on a...|[i, am, working, ...|
|470|ive been writing ...|[ive, been, writi...|
|580|i wonder how you ...|[i, wonder, how, ...|
|650|i would like the ...|[i, would, like, ...|
|810|im trying to main...|[im, trying, to, ...|
+---+--------------------+--------------------+
only showing top 10 rows



In [22]:
st_wrd_remover= StopWordsRemover(inputCol = 'tokens', outputCol = 'removed_st_wrd')
final_df= st_wrd_remover.transform(tokenize_df)
final_df.show(10)

+---+--------------------+--------------------+--------------------+
| Id|                Body|              tokens|      removed_st_wrd|
+---+--------------------+--------------------+--------------------+
| 80|ive written a dat...|[ive, written, a,...|[ive, written, da...|
| 90|are there any rea...|[are, there, any,...|[really, good, tu...|
|120|has anyone got ex...|[has, anyone, got...|[anyone, got, exp...|
|180|this is something...|[this, is, someth...|[something, ive, ...|
|260|i have a little g...|[i, have, a, litt...|[little, game, wr...|
|330|i am working on a...|[i, am, working, ...|[working, collect...|
|470|ive been writing ...|[ive, been, writi...|[ive, writing, we...|
|580|i wonder how you ...|[i, wonder, how, ...|[wonder, guys, ma...|
|650|i would like the ...|[i, would, like, ...|[like, version, p...|
|810|im trying to main...|[im, trying, to, ...|[im, trying, main...|
+---+--------------------+--------------------+--------------------+
only showing top 10 rows



# HASHING AND MODEL FITTING USING JACCARD DISTANCE

In [23]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH

In [24]:
import time
# track time
start = time.time()

# pipeline with sequence of tasks required to perform LSH with the Jaccard distance
model = Pipeline(stages=[
        HashingTF(inputCol="removed_st_wrd", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="hash", seed=123)
    ]).fit(final_df)

df_hashed = model.transform(final_df)

In [25]:
df_hashed.show(10)

+---+--------------------+--------------------+--------------------+--------------------+---------------+
| Id|                Body|              tokens|      removed_st_wrd|             vectors|           hash|
+---+--------------------+--------------------+--------------------+--------------------+---------------+
| 80|ive written a dat...|[ive, written, a,...|[ive, written, da...|(262144,[3564,461...|[[4.7238581E7]]|
| 90|are there any rea...|[are, there, any,...|[really, good, tu...|(262144,[62427,68...|[[3.9862902E7]]|
|120|has anyone got ex...|[has, anyone, got...|[anyone, got, exp...|(262144,[386,3564...|[[1.8689312E7]]|
|180|this is something...|[this, is, someth...|[something, ive, ...|(262144,[10345,37...| [[4.027477E7]]|
|260|i have a little g...|[i, have, a, litt...|[little, game, wr...|(262144,[5352,126...|[[1.2793449E7]]|
|330|i am working on a...|[i, am, working, ...|[working, collect...|(262144,[8538,978...|[[4.1105192E7]]|
|470|ive been writing ...|[ive, been, writi...

In [26]:
from pyspark.sql.functions import size
import pyspark.sql.functions as fun

In [27]:
sample= df_hashed.where((size(col("removed_st_wrd")) >= 1)).limit(40000)

In [28]:
sample.show(10)

+---+--------------------+--------------------+--------------------+--------------------+---------------+
| Id|                Body|              tokens|      removed_st_wrd|             vectors|           hash|
+---+--------------------+--------------------+--------------------+--------------------+---------------+
| 80|ive written a dat...|[ive, written, a,...|[ive, written, da...|(262144,[3564,461...|[[4.7238581E7]]|
| 90|are there any rea...|[are, there, any,...|[really, good, tu...|(262144,[62427,68...|[[3.9862902E7]]|
|120|has anyone got ex...|[has, anyone, got...|[anyone, got, exp...|(262144,[386,3564...|[[1.8689312E7]]|
|180|this is something...|[this, is, someth...|[something, ive, ...|(262144,[10345,37...| [[4.027477E7]]|
|260|i have a little g...|[i, have, a, litt...|[little, game, wr...|(262144,[5352,126...|[[1.2793449E7]]|
|330|i am working on a...|[i, am, working, ...|[working, collect...|(262144,[8538,978...|[[4.1105192E7]]|
|470|ive been writing ...|[ive, been, writi...

In [29]:
df_match = model.stages[-1].approxSimilarityJoin(sample, sample, 0.7)


#show non-duplicate matches only
df_match.select(fun.col('datasetA.id').alias('id_A'),
                 fun.col('datasetB.id').alias('id_B'),
                 fun.col('distCol')).filter('id_A < id_B').show()
end = time.time()
# return computational time needed to run the algorithm
print("Computation performed in " + str(round(end-start,2)) + " seconds")



+-------+-------+------------------+
|   id_A|   id_B|           distCol|
+-------+-------+------------------+
|  30800| 513330|0.6929133858267716|
| 103460|1251620|0.6666666666666667|
| 144530|1712670|0.6666666666666667|
| 144530| 155780|0.6666666666666667|
| 154630| 343230|             0.625|
| 218450| 727180|0.6732673267326732|
| 290080| 654450|           0.65625|
| 344460| 344550|0.6415094339622642|
| 406700| 844410|0.6923076923076923|
| 446500| 446600|0.5714285714285714|
| 503310| 835280|             0.375|
| 606820| 623990|             0.625|
| 612820| 634630|               0.5|
| 741900| 994270|0.5714285714285714|
| 897770| 905410|0.5061728395061729|
|1006060| 676920|0.5555555555555556|
|1107670|1107820|0.5866666666666667|
|1192680|1479100|0.6666666666666667|
|1257910|1544880|0.6944444444444444|
|1287340| 647750|             0.625|
+-------+-------+------------------+
only showing top 20 rows

Computation performed in 211.41 seconds


In [30]:
clean_df.where((col("id") == 612820) | (col("id") == 634630)).show(truncate=False)

+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [31]:
clean_df.where((col("id") == 30800) | (col("id") == 513330)).show(truncate=False)

+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [32]:
clean_df.where((col("id") == 503310) | (col("id") == 835280)).show(truncate=False)

+------+-------------------------------------------------------------------+
|Id    |Body                                                               |
+------+-------------------------------------------------------------------+
|503310|is it possible to map an enum as a string using fluent nhibernate\n|
|835280|its possible to map a view using fluent nhibernate if so how\n     |
+------+-------------------------------------------------------------------+

