In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder \
    .master("local") \
    .appName("Test it 2") \
    .getOrCreate()
sc = spark.sparkContext

In [2]:
from pyspark.sql import types

In [3]:
types.StructType?

In [4]:
from pyspark.sql import types
from pyspark.sql.types import StructType, LongType, StructField, TimestampType

In [5]:
StructField?

In [6]:
from pyspark.sql import types
from pyspark.sql.types import StructType, LongType, StructField, TimestampType, StringType

# root
#  |-- train_id: long (nullable = true)
#  |-- train_name: string (nullable = true)
#  |-- schedule_date: timestamp (nullable = true)
#  |-- schedule_id: long (nullable = true)
#  |-- arrival_delay: long (nullable = true)
#  |-- arrival_time: timestamp (nullable = true)
#  |-- departure_delay: long (nullable = true)
#  |-- departure_time: timestamp (nullable = true)
#  |-- station_name: string (nullable = true)

schema = StructType([
        StructField('train_id', LongType()),
        StructField('train_name', StringType()),
        StructField('schedule_date', TimestampType()),
        StructField('schedule_id', LongType()),
        StructField('arrival_delay', LongType()),
        StructField('arrival_time', TimestampType()),
        StructField('departure_delay', LongType()),
        StructField('departure_time', TimestampType()),
        StructField('station_name', StringType())
])
    
trains = spark.read.format("csv")\
        .schema(schema)\
        .option("inferSchema", "true")\
        .load("trains_sample.csv")

In [7]:
trains.printSchema()

root
 |-- train_id: long (nullable = true)
 |-- train_name: string (nullable = true)
 |-- schedule_date: timestamp (nullable = true)
 |-- schedule_id: long (nullable = true)
 |-- arrival_delay: long (nullable = true)
 |-- arrival_time: timestamp (nullable = true)
 |-- departure_delay: long (nullable = true)
 |-- departure_time: timestamp (nullable = true)
 |-- station_name: string (nullable = true)



In [8]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)


In [9]:
trains.registerTempTable("trains")

In [10]:
sqlContext.sql("""
    Select * from trains LIMIT 5
""").show()

+--------+------------------+-------------------+-----------+-------------+-------------------+---------------+-------------------+--------------------+
|train_id|        train_name|      schedule_date|schedule_id|arrival_delay|       arrival_time|departure_delay|     departure_time|        station_name|
+--------+------------------+-------------------+-----------+-------------+-------------------+---------------+-------------------+--------------------+
|     159|67900/1 KAMIEŃCZYK|2018-12-08 00:00:00|   53466607|         null|               null|              0|2018-12-08 13:17:00|Szklarska Poręba ...|
|     159|67900/1 KAMIEŃCZYK|2018-12-08 00:00:00|   53466607|            0|2018-12-08 13:21:00|              0|2018-12-08 13:21:30|Szklarska Poręba ...|
|     159|67900/1 KAMIEŃCZYK|2018-12-08 00:00:00|   53466607|            0|2018-12-08 13:27:00|              0|2018-12-08 13:27:30|Szklarska Poręba ...|
|     159|67900/1 KAMIEŃCZYK|2018-12-08 00:00:00|   53466607|            0|2018-12

In [11]:
pip install requests

Collecting requests
  Using cached https://files.pythonhosted.org/packages/51/bd/23c926cd341ea6b7dd0b2a00aba99ae0f828be89d72b2190f27c11d4b7fb/requests-2.22.0-py2.py3-none-any.whl
Collecting idna<2.9,>=2.5 (from requests)
  Using cached https://files.pythonhosted.org/packages/14/2c/cd551d81dbe15200be1cf41cd03869a46fe7226e7450af7a6545bfc474c9/idna-2.8-py2.py3-none-any.whl
Collecting chardet<3.1.0,>=3.0.2 (from requests)
  Using cached https://files.pythonhosted.org/packages/bc/a9/01ffebfb562e4274b6487b4bb1ddec7ca55ec7510b22e4c51f14098443b8/chardet-3.0.4-py2.py3-none-any.whl
Collecting urllib3!=1.25.0,!=1.25.1,<1.26,>=1.21.1 (from requests)
  Using cached https://files.pythonhosted.org/packages/e8/74/6e4f91745020f967d09332bb2b8b9b10090957334692eb88ea4afe91b77f/urllib3-1.25.8-py2.py3-none-any.whl
Collecting certifi>=2017.4.17 (from requests)
  Using cached https://files.pythonhosted.org/packages/b9/63/df50cac98ea0d5b006c55a399c3bf1db9da7b5a24de7890bc9cfd5dd9e99/certifi-2019.11.28-py2.py3-n

In [75]:
LOCATION_URL = 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id={}&ed=0&okno=polozenie'

In [76]:
urls = [
    LOCATION_URL.format(_id)
    for _id in range(0, 40000)
]

In [77]:
urls

['https://www.bazakolejowa.pl/index.php?dzial=stacje&id=0&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=1&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=2&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=3&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=4&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=5&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=6&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=7&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=8&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=9&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=11&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.ph

In [15]:
example = 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=20&ed=0&okno=polozenie'

In [16]:
import requests

In [17]:
r = requests.get(example)

In [21]:
example_coords_text = """
    <script>
    mapInit(17.305097579956, 52.947267399096,9);
        x = createStationMarker(17.305097579956, 52.947267399096,20, 'Gołańcz', 'wielkopolskie');
        x.addTo(map); //.openPopup();
            map.addLayer(osmGS)

</script>
"""

In [53]:
import re


def extract_coords(text):

    fn_match = re.search(r"createStationMarker\(.+\)", text)
    try:
        if fn_match:
            text = fn_match[0]
            text = text.replace('createStationMarker(', '',)[:-1]
            parts = text.split(',')
            parts = [part.strip() for part in parts]
            parts = [part.replace("'", "") for part in parts]
            return (float(parts[0]), float(parts[1]), parts[3])
    except Error as e:
        return None

assert (17.305097579956, 52.947267399096, 'Gołańcz') == extract_coords(example_coords_text)

In [72]:
urls

['https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.pl/index.php?dzial=stacje&id=10&ed=0&okno=polozenie',
 'https://www.bazakolejowa.p

In [78]:
rs = (requests.get(url) for url in urls)
text = (r.text for r in rs)
coords = map(extract_coords, text)
coords = filter(lambda coord: coord is not None, coords)

In [79]:
first_100 = (next(coords) for x in range(100))

In [80]:
data = sc.parallelize(first_100)

In [81]:
data.take(10)

[(17.734551429749, 52.849774139808, 'Żnin'),
 (17.4971, 52.843, 'Damasławek'),
 (17.2024, 52.8095, 'Wągrowiec'),
 (17.14403629303, 52.634000538908, 'Sława Wielkopolska'),
 (16.9112, 52.4013, 'Poznań Główny'),
 (18.244, 52.8049, 'Inowrocław'),
 (17.4856, 52.7556, 'Janowiec Wielkopolski'),
 (17.6047, 52.5296, 'Gniezno'),
 (17.946, 52.6582, 'Mogilno'),
 (16.8293, 52.6574, 'Oborniki Wielkopolskie')]

In [82]:
DF = sqlContext.createDataFrame(data, ['lat', 'lng', 'station_name'])

In [84]:
DF.printSchema()

root
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- station_name: string (nullable = true)



In [85]:
DF.registerTempTable('stations')

In [86]:
sqlContext.sql("""
    Select * from stations LIMIT 5
""").show()

+---------------+---------------+------------------+
|            lat|            lng|      station_name|
+---------------+---------------+------------------+
|17.734551429749|52.849774139808|              Żnin|
|        17.4971|         52.843|        Damasławek|
|        17.2024|        52.8095|         Wągrowiec|
| 17.14403629303|52.634000538908|Sława Wielkopolska|
|        16.9112|        52.4013|     Poznań Główny|
+---------------+---------------+------------------+



In [87]:
sqlContext.sql("""
    Select * from trains t
    JOIN stations s
    ON s.station_name = t.station_name
""").show()

+--------+----------+-------------------+-----------+-------------+-------------------+---------------+-------------------+------------+-------+-------+------------+
|train_id|train_name|      schedule_date|schedule_id|arrival_delay|       arrival_time|departure_delay|     departure_time|station_name|    lat|    lng|station_name|
+--------+----------+-------------------+-----------+-------------+-------------------+---------------+-------------------+------------+-------+-------+------------+
|      89|   90222/3|2018-12-08 00:00:00|   53465210|           10|2018-12-08 11:27:00|             10|2018-12-08 11:29:00|Toruń Główny|18.6146|52.9999|Toruń Główny|
|      89|   90222/3|2018-12-07 00:00:00|   53453561|            0|2018-12-07 11:27:00|              0|2018-12-07 11:29:00|Toruń Główny|18.6146|52.9999|Toruń Główny|
|      89|   90222/3|2018-12-06 00:00:00|   53443076|            0|2018-12-06 11:27:00|              0|2018-12-06 11:29:00|Toruń Główny|18.6146|52.9999|Toruń Główny|
|   


### pozostaje narysować mapę najbardziej opóźnionych stacji, połączeń, siatki połączęń itp 


### jak wyglada relacja przykładowego pociagu

In [93]:
#89

sqlContext.sql("""
    Select train_id, t.station_name, lat, lng from trains t
    LEFT JOIN stations s
    ON s.station_name = t.station_name
    WHERE train_id = 89
    AND schedule_id = 53465210
    ORDER by arrival_time ASC
""").show()

+--------+------------------+-------+-------+
|train_id|      station_name|    lat|    lng|
+--------+------------------+-------+-------+
|      89|  Bydgoszcz Główna|17.9917|53.1354|
|      89|              null|   null|   null|
|      89| Bydgoszcz Bielawy|   null|   null|
|      89|  Bydgoszcz Wschód|   null|   null|
|      89|              null|   null|   null|
|      89|    Solec Kujawski|   null|   null|
|      89|         Przyłubie|   null|   null|
|      89|    Cierpice Kąkol|   null|   null|
|      89|    Toruń Kluczyki|   null|   null|
|      89|      Toruń Główny|18.6146|52.9999|
|      89|      Toruń Miasto|   null|   null|
|      89|    Toruń Wschodni|   null|   null|
|      89|  Papowo Toruńskie|   null|   null|
|      89|              null|   null|   null|
|      89|  Kamionki Jezioro|   null|   null|
|      89|  Rychnowo Wielkie|   null|   null|
|      89|Kowalewo Pomorskie|   null|   null|
|      89|            Zieleń|   null|   null|
|      89|         Wąbrzeźno|   nu