In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_json, expr, col, when, udf, unix_timestamp

In [2]:
##Configuration

#config the connector jar file
spark = (SparkSession.builder.appName("SimpleSparkJob").master("spark://34.142.194.212:7077")
         .config("spark.jars", "/opt/spark/jars/gcs-connector-latest-hadoop2.jar")
         .config("spark.executor.memory", "2G")  #excutor excute only 2G
        .config("spark.driver.memory","4G") 
        .config("spark.executor.cores","1") #Cluster use only 3 cores to excute as it has 3 server
        .config("spark.python.worker.memory","1G") # each worker use 1G to excute
        .config("spark.driver.maxResultSize","3G") #Maximum size of result is 3G
        .config("spark.kryoserializer.buffer.max","1024M")
         .getOrCreate())
#config the credential to identify the google cloud hadoop file 
spark.conf.set("google.cloud.auth.service.account.json.keyfile","/opt/bucket_connector/lucky-wall-393304-3fbad5f3943c.json")
spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')

## Connect to the file in Google Bucket with Spark


# spark.show()

# spark.stop # Ending spark job



23/12/22 11:41:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/22 11:41:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/12/22 11:41:44 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [18]:
path=f"gs://it4043e-it5384/it4043e/it4043e_group7_problem1/final_final_preprocess"
tweet_final = spark.read \
    .format("parquet") \
    .option("inferSchema", "true") \
    .option("escape", "\"") \
    .option("multiline", "true") \
    .option("wholeFile", "true") \
.load(path)

In [4]:
tweet_final.columns

['username',
 'tweet_text',
 'datetime_timestamp',
 'tweet_id',
 'discussion_link',
 'images',
 'video_preview',
 'user_location',
 'user_description',
 'date',
 'time',
 'language_tweet',
 'language_description']

In [19]:
user_description_fill_value = ""

# Specify the fill value for time
time_fill_value = "00:00:00"

# Fill null values in user_description and time columns
tweet_final = tweet_final.na.fill({"user_description": user_description_fill_value, 
                                   "time": time_fill_value, 
                                   "language_tweet": user_description_fill_value, 
                                   "language_description": user_description_fill_value,
                                  "user_location": user_description_fill_value})

In [20]:
# Assuming `tweet_final` is your DataFrame
tweet_final = tweet_final.filter(tweet_final['username'].isNotNull())
tweet_final = tweet_final.filter(tweet_final['tweet_text'].isNotNull())

In [6]:
from pyspark.sql.functions import col,isnan, when, count
tweet_final.select([count(when(col(c).isNull(), c)).alias(c) for c in tweet_final.columns]
   ).show()


                                                                                

+--------+----------+------------------+--------+---------------+------+-------------+-------------+----------------+----+----+--------------+--------------------+
|username|tweet_text|datetime_timestamp|tweet_id|discussion_link|images|video_preview|user_location|user_description|date|time|language_tweet|language_description|
+--------+----------+------------------+--------+---------------+------+-------------+-------------+----------------+----+----+--------------+--------------------+
|       0|         0|                 0|       0|              0|     0|            0|            0|               0|   0|   0|             0|                   0|
+--------+----------+------------------+--------+---------------+------+-------------+-------------+----------------+----+----+--------------+--------------------+



In [29]:
from pyspark.sql.functions import col, expr
from pyspark.sql import functions as F
grouped_df = tweet_final.groupBy("username", "user_location").agg(
    F.expr("percentile_approx(time, 0.5)").alias("median_time"),
    F.expr("first(language_tweet, true)").alias("mode_tweet_language"),
    F.expr("first(language_description, true)").alias("mode_description_language"),
    F.concat_ws(" ", F.collect_list("user_description")).alias("concatenated_user_description"),
    F.concat_ws(" ", F.collect_list("tweet_text")).alias("concatenated_tweet_text")
)

In [30]:
grouped_df.show()



+--------------------+--------------------+-----------+-------------------+-------------------------+-----------------------------+-----------------------+
|            username|       user_location|median_time|mode_tweet_language|mode_description_language|concatenated_user_description|concatenated_tweet_text|
+--------------------+--------------------+-----------+-------------------+-------------------------+-----------------------------+-----------------------+
|                   0| Santa Fe, Argentina|    56120.0|            English|                         |                             |   #BTC fixes this h...|
|                   0|                 USA|    57119.0|            English|                  English|         #BTC $ETH \n$QNT ...|   Digital real esta...|
|            00100100|                    |    56568.0|            English|                  English|         i like to talk ab...|   Eventually, you w...|
|                0021|                 USA|    62805.0|         

                                                                                

In [31]:
grouped_df.printSchema()

root
 |-- username: string (nullable = true)
 |-- user_location: string (nullable = false)
 |-- median_time: double (nullable = true)
 |-- mode_tweet_language: string (nullable = true)
 |-- mode_description_language: string (nullable = true)
 |-- concatenated_user_description: string (nullable = false)
 |-- concatenated_tweet_text: string (nullable = false)



In [32]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, VectorAssembler, StandardScaler, MinMaxScaler
from pyspark.sql.functions import unix_timestamp
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Assuming 'tweet_final' is your DataFrame

# Tokenize text columns
tokenizer = Tokenizer(inputCol='username', outputCol='username_tokens')
grouped_df = tokenizer.transform(grouped_df)

tokenizer = Tokenizer(inputCol='concatenated_tweet_text', outputCol='tweet_tokens')
grouped_df = tokenizer.transform(grouped_df)

tokenizer = Tokenizer(inputCol='concatenated_user_description', outputCol='description_tokens')
grouped_df = tokenizer.transform(grouped_df)

tokenizer = Tokenizer(inputCol='mode_tweet_language', outputCol='lang_tweet_tokens')
grouped_df = tokenizer.transform(grouped_df)

tokenizer = Tokenizer(inputCol='mode_description_language', outputCol='lang_description_tokens')
grouped_df = tokenizer.transform(grouped_df)

tokenizer = Tokenizer(inputCol='user_location', outputCol='user_location_tokens')
grouped_df = tokenizer.transform(grouped_df)

#tokenizer = Tokenizer(inputCol='time_interval', outputCol='time_interval_tokens')
#tweet_final = tokenizer.transform(tweet_final)

In [33]:
# Apply TF-IDF to the tokenized text columns
hashingTF = HashingTF(inputCol='username_tokens', outputCol='username_tf')
grouped_df = hashingTF.transform(grouped_df)
idf = IDF(inputCol='username_tf', outputCol='username_tfidf')
grouped_df = idf.fit(grouped_df).transform(grouped_df)

hashingTF = HashingTF(inputCol='tweet_tokens', outputCol='tweet_tf')
grouped_df = hashingTF.transform(grouped_df)
idf = IDF(inputCol='tweet_tf', outputCol='tweet_tfidf')
grouped_df = idf.fit(grouped_df).transform(grouped_df)

hashingTF = HashingTF(inputCol='description_tokens', outputCol='description_tf')
grouped_df = hashingTF.transform(grouped_df)
idf = IDF(inputCol='description_tf', outputCol='description_tfidf')
grouped_df = idf.fit(grouped_df).transform(grouped_df)

hashingTF = HashingTF(inputCol='lang_tweet_tokens', outputCol='tweet_lang_tf')
grouped_df = hashingTF.transform(grouped_df)
idf = IDF(inputCol='tweet_lang_tf', outputCol='tweet_lang_tfidf')
grouped_df = idf.fit(grouped_df).transform(grouped_df)

hashingTF = HashingTF(inputCol='lang_description_tokens', outputCol='description_lang_tf')
grouped_df = hashingTF.transform(grouped_df)
idf = IDF(inputCol='description_lang_tf', outputCol='description_lang_tfidf')
grouped_df = idf.fit(grouped_df).transform(grouped_df)

hashingTF = HashingTF(inputCol='user_location_tokens', outputCol='user_location_tf')
grouped_df = hashingTF.transform(grouped_df)
idf = IDF(inputCol='user_location_tf', outputCol='user_location_tfidf')
grouped_df = idf.fit(grouped_df).transform(grouped_df)

'''hashingTF = HashingTF(inputCol='time_interval_tokens', outputCol='time_interval_tf')
tweet_final = hashingTF.transform(tweet_final)
idf = IDF(inputCol='time_interval_tf', outputCol='time_interval_tfidf')
tweet_final = idf.fit(tweet_final).transform(tweet_final)'''


                                                                                

"hashingTF = HashingTF(inputCol='time_interval_tokens', outputCol='time_interval_tf')\ntweet_final = hashingTF.transform(tweet_final)\nidf = IDF(inputCol='time_interval_tf', outputCol='time_interval_tfidf')\ntweet_final = idf.fit(tweet_final).transform(tweet_final)"

In [23]:
# Convert 'date' and 'time' to Unix timestamps
#tweet_final = tweet_final.withColumn('date', unix_timestamp('date', 'yy-MM-dd').cast('double'))
tweet_final = tweet_final.withColumn('time', unix_timestamp('time', 'HH:mm:ss').cast('double'))


In [14]:
min_time = tweet_final.agg({"time": "min"}).collect()[0]["min(time)"]
max_time = tweet_final.agg({"time": "max"}).collect()[0]["max(time)"]

normalized_time_expr = (col("time") - min_time) / (max_time - min_time)
tweet_final = tweet_final.withColumn("normalized_time", normalized_time_expr)

                                                                                

In [15]:
tweet_final.select('normalized_time', 'time').show()

[Stage 31:>                                                         (0 + 1) / 1]

+-------------------+-------+
|    normalized_time|   time|
+-------------------+-------+
| 0.6641975022859061|57386.0|
| 0.7181911827683191|62051.0|
|0.32502691003368095|28082.0|
|  0.506857718260628|43792.0|
|0.30815171471892033|26624.0|
| 0.9257977522887997|79988.0|
| 0.5400872695285825|46663.0|
| 0.5516614775633977|47663.0|
|  0.194863366474149|16836.0|
| 0.4181414136737694|36127.0|
| 0.1156842093079781| 9995.0|
| 0.2145626685494045|18538.0|
| 0.6755518003680598|58367.0|
|0.39498142339610415|34126.0|
|0.15673792520746768|13542.0|
| 0.8597784696582136|74284.0|
| 0.9130198266183637|78884.0|
| 0.3583722033819836|30963.0|
| 0.6633641593073993|57314.0|
| 0.5343464623433142|46167.0|
+-------------------+-------+
only showing top 20 rows



                                                                                

In [35]:
# Assemble features into a single vector column

feature_cols = ['username_tfidf',
                'tweet_tfidf',
                'description_tfidf',
                'tweet_lang_tfidf',
                'description_lang_tfidf',
                'median_time', 
                'user_location_tfidf']
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features', handleInvalid='skip')
data = assembler.transform(grouped_df)



In [36]:
data.printSchema()

root
 |-- username: string (nullable = true)
 |-- user_location: string (nullable = false)
 |-- median_time: double (nullable = true)
 |-- mode_tweet_language: string (nullable = true)
 |-- mode_description_language: string (nullable = true)
 |-- concatenated_user_description: string (nullable = false)
 |-- concatenated_tweet_text: string (nullable = false)
 |-- username_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tweet_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- lang_tweet_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- lang_description_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- user_location_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- username_tf: vector (nullable = true)
 |-- username_tfid

In [37]:
# Set the number of clusters
kmeans = KMeans().setK(5).setSeed(1)

# Fit the model
model = kmeans.fit(data)

# Predict clusters
predictions = model.transform(data)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette_score = evaluator.evaluate(predictions)
print(f"Silhouette score: {silhouette_score}")

# Shows the result
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

23/12/22 12:34:08 WARN DAGScheduler: Broadcasting large task binary with size 124.8 MiB
23/12/22 12:36:37 WARN DAGScheduler: Broadcasting large task binary with size 124.7 MiB
23/12/22 12:37:00 WARN DAGScheduler: Broadcasting large task binary with size 124.8 MiB
23/12/22 12:39:30 WARN DAGScheduler: Broadcasting large task binary with size 124.8 MiB
23/12/22 12:39:48 WARN DAGScheduler: Broadcasting large task binary with size 124.8 MiB
23/12/22 12:40:18 WARN DAGScheduler: Broadcasting large task binary with size 124.8 MiB
23/12/22 12:40:38 WARN DAGScheduler: Broadcasting large task binary with size 124.8 MiB
23/12/22 12:41:09 WARN DAGScheduler: Broadcasting large task binary with size 124.8 MiB
23/12/22 12:41:31 WARN DAGScheduler: Broadcasting large task binary with size 124.8 MiB
23/12/22 12:41:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/12/22 12:42:09 WARN DAGScheduler: Broadcasting large task binary with size 124.8 MiB
23/12/22 12:4

Silhouette score: 0.6877878692101088
Cluster Centers: 
[0.        0.        0.0002126 ... 0.        0.        0.       ]
[0.00012835 0.         0.00023389 ... 0.         0.         0.        ]
[0. 0. 0. ... 0. 0. 0.]
[0.         0.         0.00014424 ... 0.         0.         0.        ]
[0.00000000e+00 1.00238424e-04 9.13284930e-05 ... 1.00238424e-04
 0.00000000e+00 0.00000000e+00]


In [121]:
evaluator1 = ClusteringEvaluator(predictionCol="prediction", featuresCol="features", metricName="davies_bouldin")
db_index = evaluator.evaluate(predictions)
print(f"Davies-Bouldin Index: {db_index}")

23/12/21 12:14:58 WARN DAGScheduler: Broadcasting large task binary with size 163.4 MiB
23/12/21 12:15:48 WARN DAGScheduler: Broadcasting large task binary with size 80.1 MiB
[Stage 660:>                                                        (0 + 1) / 1]

Davies-Bouldin Index: -0.24849720185441587


                                                                                

In [39]:
predictions.select('username', 'median_time', 'user_location', 'prediction').show(50)

23/12/22 13:02:44 WARN DAGScheduler: Broadcasting large task binary with size 84.2 MiB
[Stage 136:>                                                        (0 + 1) / 1]

+--------------------+-----------+--------------------+----------+
|            username|median_time|       user_location|prediction|
+--------------------+-----------+--------------------+----------+
|                   0|    56120.0| Santa Fe, Argentina|         4|
|                   0|    57119.0|                 USA|         4|
|            00100100|    56568.0|                    |         4|
|                0021|    62805.0|                 USA|         4|
|             0060sol|    32840.0|            pfp: mcd|         3|
|      00michaeltalty|    47298.0|            new york|         2|
|             00press|    65719.0|                  PA|         4|
|01000001011100100...|    57470.0|                    |         4|
|01001010011011110...|    78604.0|        Aquí y ahora|         1|
|         01010010101|    28032.0|                    |         3|
|01029387382747338...|    32062.0|                    |         3|
|             010Coin|    39007.0|Rotterdam, Nederland|       

                                                                                

In [75]:
predictions.filter(predictions['username'] == '0xSuperTrooper').select('username', 'time', 'language_description','language_tweet', 'prediction').show()

23/12/21 11:34:27 WARN DAGScheduler: Broadcasting large task binary with size 80.1 MiB
[Stage 515:>                                                        (0 + 1) / 1]

+--------------+--------+--------------------+--------------+----------+
|      username|    time|language_description|language_tweet|prediction|
+--------------+--------+--------------------+--------------+----------+
|0xSuperTrooper|23:32:00|             English|       English|         0|
+--------------+--------+--------------------+--------------+----------+



                                                                                

In [40]:
predictions.groupBy(predictions.prediction).count().show()

23/12/22 13:05:32 WARN DAGScheduler: Broadcasting large task binary with size 184.8 MiB
23/12/22 13:07:55 WARN DAGScheduler: Broadcasting large task binary with size 184.7 MiB
[Stage 142:>                                                        (0 + 1) / 1]

+----------+------+
|prediction| count|
+----------+------+
|         1| 95866|
|         3| 78319|
|         4|123011|
|         2|116047|
|         0| 53123|
+----------+------+



                                                                                

In [47]:
predictions.filter((predictions.prediction == 1) & (predictions.user_location.isNotNull())).select('username', 'median_time', 'user_location').show(50)


23/12/22 13:13:45 WARN DAGScheduler: Broadcasting large task binary with size 84.2 MiB
[Stage 151:>                                                        (0 + 1) / 1]

+--------------------+-----------+--------------------+
|            username|median_time|       user_location|
+--------------------+-----------+--------------------+
|01001010011011110...|    78604.0|        Aquí y ahora|
|                0NFT|    74503.0|                    |
|                0hmm|    82516.0|                    |
|                  0x|    70873.0|       The Metaverse|
|          0x0smaneth|    86235.0|                    |
|            0x69Sinu|    85226.0|                    |
|         0xComΞtaeth|    79861.0|                    |
|        0xCryptology|    78426.0|  Eskişehir, Türkiye|
|        0xDigitalOil|    72980.0|                    |
|           0xDoodles|    71776.0| Ethereum Blockchain|
|           0xEvoleth|    77331.0|            Da Swamp|
|               0xHex|    72548.0|                    |
|   0xLamborghiniLady|    71059.0|                    |
|         0xLeMistral|    74174.0|   Le Mistral Palace|
|             0xLongs|    81405.0|              

                                                                                

In [53]:
from pyspark.sql.functions import desc
predictions.filter(predictions.prediction == 1).groupBy("user_location").count().orderBy(desc("count")).show()

23/12/22 13:25:13 WARN DAGScheduler: Broadcasting large task binary with size 184.8 MiB
23/12/22 13:28:08 WARN DAGScheduler: Broadcasting large task binary with size 184.7 MiB
[Stage 175:>                                                        (0 + 1) / 1]

+---------------+-----+
|  user_location|count|
+---------------+-----+
|               |44856|
|  United States| 1287|
|      Metaverse|  652|
|Los Angeles, CA|  568|
|London, England|  540|
|            USA|  491|
|         Canada|  448|
| United Kingdom|  429|
|  New York, USA|  420|
|          Earth|  407|
|California, USA|  406|
|        Nigeria|  336|
|      Australia|  280|
| Lagos, Nigeria|  256|
|      Miami, FL|  256|
|   New York, NY|  251|
|           Moon|  249|
|   Florida, USA|  236|
|         London|  233|
|          India|  229|
+---------------+-----+
only showing top 20 rows



                                                                                

In [55]:
predictions.select('mode_tweet_language').distinct().show()

23/12/22 13:32:36 WARN DAGScheduler: Broadcasting large task binary with size 24.1 MiB
23/12/22 13:33:36 WARN DAGScheduler: Broadcasting large task binary with size 24.1 MiB


+-------------------+
|mode_tweet_language|
+-------------------+
|            Turkish|
|            English|
|            Spanish|
|           Kurmanci|
|             French|
|                   |
|             German|
+-------------------+



                                                                                

In [58]:
predictions.printSchema()

root
 |-- username: string (nullable = true)
 |-- user_location: string (nullable = false)
 |-- median_time: double (nullable = true)
 |-- mode_tweet_language: string (nullable = true)
 |-- mode_description_language: string (nullable = true)
 |-- concatenated_user_description: string (nullable = false)
 |-- concatenated_tweet_text: string (nullable = false)
 |-- username_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tweet_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- lang_tweet_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- lang_description_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- user_location_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- username_tf: vector (nullable = true)
 |-- username_tfid

23/12/22 13:40:31 WARN DAGScheduler: Broadcasting large task binary with size 258.1 MiB
23/12/22 13:41:03 WARN TaskSetManager: Lost task 0.0 in stage 184.2 (TID 618) (10.148.0.23 executor 4): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:86)
	at java.base/java.lang.StringBuilder.<init>(StringBuilder.java:116)
	at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:487)
	at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:99)
	at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsString(ObjectWriter.java:1141)
	at org.json4s.jackson.JsonMethods.pretty(JsonMethods.scala:60)
	at org.json4s.jackson.JsonMethods.pretty$(JsonMethods.scala:58)
	at org.json4s.jackson.JsonMethods$.pretty(JsonMethods.scala:71)
	at org.apache.spark.sql.types.DataType.prettyJson(DataType.scala:70)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSuppo

In [59]:
predictions.select('username', 'user_location', 'median_time', 'mode_tweet_language','mode_description_language', 'concatenated_user_description', 'concatenated_tweet_text', 'prediction').write.parquet("gs://it4043e-it5384/it4043e/it4043e_group7_problem1/prediction",mode = "overwrite")

23/12/22 13:44:55 WARN DAGScheduler: Broadcasting large task binary with size 84.4 MiB
23/12/22 14:49:42 ERROR TaskSchedulerImpl: Lost executor 8 on 10.148.0.23: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/12/22 14:49:43 ERROR TaskSchedulerImpl: Lost executor 9 on 10.148.0.23: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/12/22 15:43:33 WARN TransportChannelHandler: Exception in connection from /34.142.194.212:7077
java.io.IOException: Connection reset by peer
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.Socket

In [5]:
spark.stop()