# Spark Prefix Span

In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.fpm import PrefixSpan
from pyspark.sql import Row
import pyspark.sql.functions as F
import json
import pandas as pd
import ast
from tqdm import tqdm

## For all the lines at once

In [8]:
stops = pd.read_csv("data/timetables/gtfs3Sept/stops.txt")
stops_key = { i.get("stop_id"):i.get("stop_name") for i in stops[["stop_id", "stop_name"]].to_dict("records")}

In [21]:
l = 1
path = f"data/line_sequences/seq.data.{l}.txt"
seq = []
with open(path, "r") as dta:
    temp = []
    id = 0
    data = [row.strip().split() for row in dta.readlines()]
    for ent in data:
        if int(ent[0]) == id:
            temp.append([i for i in ent[3:]])
        else:
            seq.append(Row(sequence = temp))
            id = int(ent[0]) 
            temp = [[i for i in ent[3:]]]

In [22]:
spark = SparkSession.builder.getOrCreate()
sparkContext=spark.sparkContext
df = sparkContext.parallelize(seq).toDF()

In [23]:
prefixSpan = PrefixSpan()
prefixSpan.setMinSupport(0.1)
prefixSpan.setMaxPatternLength(5)
ps = prefixSpan.findFrequentSequentialPatterns(df).withColumn('len', F.size('sequence'))
df = ps.filter(ps.len > 1).sort(ps.freq.desc()).toPandas()
df["seq_stop_name"] = df.sequence.apply(lambda x : [[stops_key[sub] for sub in ss] for ss in x])
df.to_csv(f"data/seq/per_line/seq{l}.csv")


22/12/13 20:02:19 WARN PrefixSpan: Input data is not cached.
                                                                                

## For multiple line analysis

In [6]:
spark = SparkSession.builder.getOrCreate()
sparkContext=spark.sparkContext

def prefspan(inpath:str, line_number:int, stops_keys:dict, outpath_folder:str = "data/seq/per_line"):
    seq = []
    with open(inpath, "r") as dta:
        temp = []
        id = 0
        data = [row.strip().split() for row in dta.readlines()]
        for ent in data:
            if int(ent[0]) == id:
                temp.append([i for i in ent[3:]])
            else:
                seq.append(Row(sequence = temp))
                id = int(ent[0]) 
                temp = [[i for i in ent[3:]]]

    df = sparkContext.parallelize(seq).toDF()
    prefixSpan = PrefixSpan()
    prefixSpan.setMinSupport(0.01)
    prefixSpan.setMaxPatternLength(10)
    ps = prefixSpan.findFrequentSequentialPatterns(df).withColumn('len', F.size('sequence')).toPandas()
    del df
    ps = ps.withColumn('len', F.size('sequence'))
    df = ps.toPandas()
    del ps
    df["seq_stop_name"] = seq.sequence.apply(lambda x : [[stops_keys[sub] for sub in ss] for ss in ast.literal_eval(x)])
    df.to_csv(outpath_folder + f"/seq_{line_number}.csv", index=False)
    del df
    return
    


In [7]:
import os

folder_seq = "data/line_sequences"
dir_list = os.listdir(folder_seq)
for line in tqdm(dir_list):
    nbr = line.split(".")[2]
    filenm = folder_seq + "/" + line
    prefspan(filenm, nbr, stops_key)

  0%|          | 0/74 [00:00<?, ?it/s]


ConnectionRefusedError: [Errno 61] Connection refused

# STIB adaptation

In [120]:
import ast

In [111]:
seq = pd.read_csv("data/seq/seq.csv", index_col=0)
stops = pd.read_csv("data/timetables/gtfs3Sept/stops.txt")

In [132]:
stops_key = { i.get("stop_id"):i.get("stop_name") for i in stops[["stop_id", "stop_name"]].to_dict("records")}

In [131]:
seq["seq_stop_name"] = seq.sequence.apply(lambda x : [[stops_key[sub] for sub in ss] for ss in ast.literal_eval(x)])

In [133]:
seq.to_csv("data/seq/seq.csv", index=False)