In [168]:
#import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import pyspark.pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, VectorIndexer, StandardScaler
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col, countDistinct, min as spark_min, max as spark_max, split, hour, minute, second, count, isnan, when, month, array
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

In [3]:
spark = SparkSession.builder.appName("KNNExample").getOrCreate()

In [4]:
data = spark.read.options(delimiter="\t", header=True, inferSchema=True).csv('../data/tweet.txt')
data.show(10)

+--------+------------------+------------+--------------+-------------+-------------------+-------------+---------------+--------------+
|tweet_id|     tweet_tweetId|tweet_userId|tweet_artistId|tweet_trackId|     tweet_datetime|tweet_weekday|tweet_longitude|tweet_latitude|
+--------+------------------+------------+--------------+-------------+-------------------+-------------+---------------+--------------+
|       1|167408118735699971|    24842995|         25040|       420585|2012-02-09 00:43:00|            3|       0.126573|       52.1976|
|       2|167408251737088000|    77147006|        122992|      2106213|2012-02-09 00:43:32|            3|       -80.1999|       36.0349|
|       3|185189790390558720|   174626103|        468091|      8030020|2012-03-29 02:21:01|            3|      -0.142822|       51.5207|
|       4|229940340692500480|   538669914|         67904|      1210996|2012-07-30 14:03:43|            0|        2.33475|       48.9009|
|       5|167408424307523584|   245555020

In [5]:
data

DataFrame[tweet_id: int, tweet_tweetId: bigint, tweet_userId: int, tweet_artistId: int, tweet_trackId: int, tweet_datetime: timestamp, tweet_weekday: int, tweet_longitude: double, tweet_latitude: double]

In [63]:
data.count()

1090726

In [64]:
data.groupby('tweet_trackId').count().sort('count', ascending=False).show(20)

+-------------+-----+
|tweet_trackId|count|
+-------------+-----+
|       141574| 4387|
|      1479214| 3331|
|      2966419| 2742|
|       141567| 2543|
|      4098232| 2532|
|     10608837| 2225|
|      6213179| 2116|
|      6213206| 2081|
|      1210996| 1993|
|       601022| 1941|
|      2706055| 1868|
|      6213223| 1851|
|      4860214| 1540|
|      8010733| 1536|
|      1072515| 1525|
|      6627282| 1509|
|      1379803| 1482|
|      5530557| 1480|
|      1379821| 1468|
|      2106361| 1432|
+-------------+-----+
only showing top 20 rows



In [37]:
# Użytkownicy
data.agg(countDistinct('tweet_trackId')).show()

+-----------------------------+
|count(DISTINCT tweet_trackId)|
+-----------------------------+
|                       134199|
+-----------------------------+



In [39]:
# Użytkownicy
data.agg(countDistinct('tweet_userId')).show()

+----------------------------+
|count(DISTINCT tweet_userId)|
+----------------------------+
|                      215375|
+----------------------------+



In [40]:
# Użytkownicy
data.agg(countDistinct('tweet_artistId')).show()

+------------------------------+
|count(DISTINCT tweet_artistId)|
+------------------------------+
|                         25081|
+------------------------------+



In [75]:
data.agg(spark_min('tweet_datetime')).show()

+-------------------+
|min(tweet_datetime)|
+-------------------+
|2011-11-09 12:18:57|
+-------------------+



In [76]:
data.agg(spark_max('tweet_datetime')).show()

+-------------------+
|max(tweet_datetime)|
+-------------------+
|2013-04-30 15:22:45|
+-------------------+



In [89]:
df = data[['tweet_trackId', 'tweet_longitude', 'tweet_latitude']]
df.show(10)

+-------------+---------------+--------------+
|tweet_trackId|tweet_longitude|tweet_latitude|
+-------------+---------------+--------------+
|       420585|       0.126573|       52.1976|
|      2106213|       -80.1999|       36.0349|
|      8030020|      -0.142822|       51.5207|
|      1210996|        2.33475|       48.9009|
|      4019560|        -47.818|      -21.2134|
|       598309|       -81.4733|       41.4433|
|      4814971|        3.35029|       6.57453|
|      7946138|        -112.18|       33.5156|
|      5494830|       -115.202|       36.2068|
|      7194015|       -122.913|       49.1994|
+-------------+---------------+--------------+
only showing top 10 rows



In [90]:
assembler = VectorAssembler(inputCols=["tweet_longitude", "tweet_latitude"], outputCol="location")
df = assembler.transform(df)
df.show(10)

+-------------+---------------+--------------+-------------------+
|tweet_trackId|tweet_longitude|tweet_latitude|           location|
+-------------+---------------+--------------+-------------------+
|       420585|       0.126573|       52.1976| [0.126573,52.1976]|
|      2106213|       -80.1999|       36.0349| [-80.1999,36.0349]|
|      8030020|      -0.142822|       51.5207|[-0.142822,51.5207]|
|      1210996|        2.33475|       48.9009|  [2.33475,48.9009]|
|      4019560|        -47.818|      -21.2134| [-47.818,-21.2134]|
|       598309|       -81.4733|       41.4433| [-81.4733,41.4433]|
|      4814971|        3.35029|       6.57453|  [3.35029,6.57453]|
|      7946138|        -112.18|       33.5156|  [-112.18,33.5156]|
|      5494830|       -115.202|       36.2068| [-115.202,36.2068]|
|      7194015|       -122.913|       49.1994| [-122.913,49.1994]|
+-------------+---------------+--------------+-------------------+
only showing top 10 rows



In [91]:
def euclidean_distance(point1, point2):
    return np.sqrt(np.sum((point1 - point2) ** 2))

def knn_predict(df, point, k):
    distances = df.rdd.map(lambda row: (row['tweet_trackId'], euclidean_distance(np.array(row['location']), np.array(point))))
    sorted_distances = distances.sortBy(lambda x: x[1])
    neighbors = sorted_distances.take(k)
    
    # Klasyfikacja na podstawie najczęściej występującej klasy w sąsiadach
    labels = [neighbor[0] for neighbor in neighbors]
    prediction = max(set(labels), key=labels.count)
    
    return prediction

In [92]:
%%time

new_point = [2.33475, 48.9009]
k = 10
prediction = knn_predict(df, new_point, k)
print(f"Predicted label for point {new_point} is {prediction}")

Predicted label for point [2.33475, 48.9009] is 44067
CPU times: user 33.5 ms, sys: 22.4 ms, total: 55.9 ms
Wall time: 33.7 s


# Adding new columns

In [6]:
data.show(10)

+--------+------------------+------------+--------------+-------------+-------------------+-------------+---------------+--------------+
|tweet_id|     tweet_tweetId|tweet_userId|tweet_artistId|tweet_trackId|     tweet_datetime|tweet_weekday|tweet_longitude|tweet_latitude|
+--------+------------------+------------+--------------+-------------+-------------------+-------------+---------------+--------------+
|       1|167408118735699971|    24842995|         25040|       420585|2012-02-09 00:43:00|            3|       0.126573|       52.1976|
|       2|167408251737088000|    77147006|        122992|      2106213|2012-02-09 00:43:32|            3|       -80.1999|       36.0349|
|       3|185189790390558720|   174626103|        468091|      8030020|2012-03-29 02:21:01|            3|      -0.142822|       51.5207|
|       4|229940340692500480|   538669914|         67904|      1210996|2012-07-30 14:03:43|            0|        2.33475|       48.9009|
|       5|167408424307523584|   245555020

In [53]:
data_new = data.withColumn("total_seconds", (hour("tweet_datetime") * 3600 + minute("tweet_datetime") * 60 + second("tweet_datetime")).cast('double'))
data_new = data_new.withColumn("month", month("tweet_datetime"))
data_new = data_new.drop("tweet_tweetId", "tweet_datetime")
data_new = data_new.withColumn("tweet_weekday", col("tweet_weekday") + 1)
data_new.show(10)

+--------+------------+--------------+-------------+-------------+---------------+--------------+-------------+-----+
|tweet_id|tweet_userId|tweet_artistId|tweet_trackId|tweet_weekday|tweet_longitude|tweet_latitude|total_seconds|month|
+--------+------------+--------------+-------------+-------------+---------------+--------------+-------------+-----+
|       1|    24842995|         25040|       420585|            4|       0.126573|       52.1976|       2580.0|    2|
|       2|    77147006|        122992|      2106213|            4|       -80.1999|       36.0349|       2612.0|    2|
|       3|   174626103|        468091|      8030020|            4|      -0.142822|       51.5207|       8461.0|    3|
|       4|   538669914|         67904|      1210996|            1|        2.33475|       48.9009|      50623.0|    7|
|       5|   245555020|        223140|      4019560|            4|        -47.818|      -21.2134|       2653.0|    2|
|       6|   265101134|         33886|       598309|    

# Encoding

In [128]:
data_new.columns

['tweet_id',
 'tweet_userId',
 'tweet_artistId',
 'tweet_trackId',
 'tweet_weekday',
 'tweet_longitude',
 'tweet_latitude',
 'total_seconds',
 'month']

In [198]:
# Zmienne numeryczne
numeric_features = ['tweet_longitude', 'tweet_latitude', 'total_seconds']

# Zmienne kategoryczne
categorical_features = ['tweet_weekday', 'month']

# Label (zmienna docelowa)
label = 'tweet_trackId'

# Indeksowanie zmiennych kategorycznych
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in categorical_features]

# Użyj OneHotEncoder z opcją dropLast=False
encoders = [OneHotEncoder(inputCol=column + "_index", outputCol=column + "_encoded", dropLast=False) for column in categorical_features]

# Tworzenie pipeline
pipeline = Pipeline(stages=indexers + encoders)

# Dopasowanie pipeline do danych i przekształcenie danych
model = pipeline.fit(data_new[['tweet_trackId', 'tweet_weekday','tweet_longitude', 'tweet_latitude', 'total_seconds', 'month']])
data_new_2 = model.transform(data_new[['tweet_trackId', 'tweet_weekday','tweet_longitude', 'tweet_latitude', 'total_seconds', 'month']])

for column in categorical_features:
    # Konwersja wektora one-hot na tablicę
    data_new_2 = data_new_2.withColumn(column + "_encoded_array", vector_to_array(col(column + "_encoded")))
    # Dynamically select the one-hot encoded columns
    encoded_columns = [col(f"{column}_encoded_array")[i].alias(f"{column}_encoded_{i}") for i in range(len(data_new_2.select(column + "_encoded_array").first()[0]))]
    # Select all columns + the new one-hot encoded columns
    data_new_2 = data_new_2.select("*", *encoded_columns)

    # Usuń kolumny pośrednie
    data_new_2 = data_new_2.drop(column, column + "_index", column + "_encoded", column + "_encoded_array")

data_new_2.show(5)

+-------------+---------------+--------------+-------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------+----------------+
|tweet_trackId|tweet_longitude|tweet_latitude|total_seconds|tweet_weekday_encoded_0|tweet_weekday_encoded_1|tweet_weekday_encoded_2|tweet_weekday_encoded_3|tweet_weekday_encoded_4|tweet_weekday_encoded_5|tweet_weekday_encoded_6|month_encoded_0|month_encoded_1|month_encoded_2|month_encoded_3|month_encoded_4|month_encoded_5|month_encoded_6|month_encoded_7|month_encoded_8|month_encoded_9|month_encoded_10|month_encoded_11|
+-------------+---------------+--------------+-------------+-----------------------+-----------------------+-----------------------+----------------------

In [199]:
data_new_2

DataFrame[tweet_trackId: int, tweet_longitude: double, tweet_latitude: double, total_seconds: double, tweet_weekday_encoded_0: double, tweet_weekday_encoded_1: double, tweet_weekday_encoded_2: double, tweet_weekday_encoded_3: double, tweet_weekday_encoded_4: double, tweet_weekday_encoded_5: double, tweet_weekday_encoded_6: double, month_encoded_0: double, month_encoded_1: double, month_encoded_2: double, month_encoded_3: double, month_encoded_4: double, month_encoded_5: double, month_encoded_6: double, month_encoded_7: double, month_encoded_8: double, month_encoded_9: double, month_encoded_10: double, month_encoded_11: double]

# Standarization

In [200]:
features = data_new_2.columns
features.remove('tweet_trackId')
features

['tweet_longitude',
 'tweet_latitude',
 'total_seconds',
 'tweet_weekday_encoded_0',
 'tweet_weekday_encoded_1',
 'tweet_weekday_encoded_2',
 'tweet_weekday_encoded_3',
 'tweet_weekday_encoded_4',
 'tweet_weekday_encoded_5',
 'tweet_weekday_encoded_6',
 'month_encoded_0',
 'month_encoded_1',
 'month_encoded_2',
 'month_encoded_3',
 'month_encoded_4',
 'month_encoded_5',
 'month_encoded_6',
 'month_encoded_7',
 'month_encoded_8',
 'month_encoded_9',
 'month_encoded_10',
 'month_encoded_11']

In [201]:
df = data_new_2.withColumn("features", array(*features))

to_vector_udf = udf(lambda array: DenseVector(array), VectorUDT())

df = df.withColumn("features", to_vector_udf(df["features"]))

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

model = scaler.fit(df)
df = model.transform(df)

df.select('features', 'scaled_features').show(5)

+--------------------+--------------------+
|            features|     scaled_features|
+--------------------+--------------------+
|[0.126573,52.1976...|[0.00186958583067...|
|[-80.1999,36.0349...|[-1.1846175460916...|
|[-0.142822,51.520...|[-0.0021095967347...|
|[2.33475,48.9009,...|[0.03448615042833...|
|[-47.818,-21.2134...|[-0.7063106290532...|
+--------------------+--------------------+
only showing top 5 rows



In [202]:
df.select('features', 'scaled_features').head(5)[0]

Row(features=DenseVector([0.1266, 52.1976, 2580.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), scaled_features=DenseVector([0.0019, 1.896, 0.104, 0.0, 0.0, 2.8492, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.1823, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]))

In [203]:
df

DataFrame[tweet_trackId: int, tweet_longitude: double, tweet_latitude: double, total_seconds: double, tweet_weekday_encoded_0: double, tweet_weekday_encoded_1: double, tweet_weekday_encoded_2: double, tweet_weekday_encoded_3: double, tweet_weekday_encoded_4: double, tweet_weekday_encoded_5: double, tweet_weekday_encoded_6: double, month_encoded_0: double, month_encoded_1: double, month_encoded_2: double, month_encoded_3: double, month_encoded_4: double, month_encoded_5: double, month_encoded_6: double, month_encoded_7: double, month_encoded_8: double, month_encoded_9: double, month_encoded_10: double, month_encoded_11: double, features: vector, scaled_features: vector]

# KNN

In [205]:
def euclidean_distance(point1, point2):
    return np.sqrt(np.sum((point1 - point2) ** 2))

def knn_predict(df, point, k):
    distances = df.rdd.map(lambda row: (row['tweet_trackId'], euclidean_distance(np.array(row['scaled_features']), np.array(point))))
    sorted_distances = distances.sortBy(lambda x: x[1])
    neighbors = sorted_distances.take(k)
    
    # Klasyfikacja na podstawie najczęściej występującej klasy w sąsiadach
    labels = [neighbor[0] for neighbor in neighbors]
    prediction = max(set(labels), key=labels.count)
    
    return prediction

In [166]:
list(df.head(5)[0])[0]

420585

In [162]:
list(df.head(5)[0])[-1]

[0.126573,
 52.1976,
 2580.0,
 0.0,
 0.0,
 1.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 1.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0]

In [206]:
%%time

new_point = np.array(list(df.head(5)[0])[-1])
k = 10
prediction = knn_predict(df, new_point, k)
print("tweet_trackId:", prediction)

tweet_trackId: 1231910
CPU times: user 56.9 ms, sys: 9.19 ms, total: 66.1 ms
Wall time: 1min 36s


In [236]:
def euclidean_distance(point1, point2):
    return np.sqrt(np.sum((point1 - point2) ** 2))

def knn_predict(df, point, k):
    distances = df.rdd.map(lambda row: (row['tweet_trackId'], euclidean_distance(np.array(row['scaled_features']), np.array(point))))
    sorted_distances = distances.sortBy(lambda x: x[1])
    neighbors = sorted_distances.take(k)
    return neighbors
    
    # Klasyfikacja na podstawie najczęściej występującej klasy w sąsiadach
    labels = np.array(x, dtype=int)[:, 0]
    return max(set(labels), key=labels.count)

In [216]:
%%time

new_point = np.array(list(df.head(5)[0])[-1])
k = 10
prediction = knn_predict(df, new_point, k)
prediction

CPU times: user 42.7 ms, sys: 15.3 ms, total: 58 ms
Wall time: 1min 41s


[(420585, 0.0),
 (1479214, 0.016348467588064672),
 (5240295, 0.024065119368573442),
 (3256970, 0.024908307711726408),
 (4128619, 0.02490866639313596),
 (9769460, 0.025187946250050924),
 (1478230, 0.025231472478986047),
 (6832605, 0.025520938375185286),
 (1231910, 0.025538626883347754),
 (4119415, 0.025596127709824158)]

In [243]:
x = [(420585, 0.0),
 (1479214, 0.016348467588064672),
 (5240295, 0.024065119368573442),
 (3256970, 0.024908307711726408),
 (4128619, 0.02490866639313596),
 (420585, 0.025187946250050924),
 (1478230, 0.025231472478986047),
 (6832605, 0.025520938375185286),
 (1231910, 0.025538626883347754),
 (4119415, 0.025596127709824158)]
x

[(420585, 0.0),
 (1479214, 0.016348467588064672),
 (5240295, 0.024065119368573442),
 (3256970, 0.024908307711726408),
 (4128619, 0.02490866639313596),
 (420585, 0.025187946250050924),
 (1478230, 0.025231472478986047),
 (6832605, 0.025520938375185286),
 (1231910, 0.025538626883347754),
 (4119415, 0.025596127709824158)]

In [244]:
labels = np.array(x, dtype=int)[:, 0]
set(labels)

{420585,
 1231910,
 1478230,
 1479214,
 3256970,
 4119415,
 4128619,
 5240295,
 6832605}

In [232]:
labels = [neighbor[0] for neighbor in x]
#(set(labels), labels.count)
set(labels)

{420585,
 1231910,
 1478230,
 1479214,
 3256970,
 4119415,
 4128619,
 5240295,
 6832605}

In [235]:
labels.count(147914)

0