# Installing Spark

In [1]:
!pip3 install pyspark==3.0.0

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044159 sha256=72a6db694e1f03337b61f0d2d8ae1d223903430de9ab2a72b34fd914fe1df328
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0

# Downloading the datasets

In [2]:
import requests

with open("stations.csv", "wb") as f:
    request = requests.get("https://git.ai.ssau.ru/tk/big_data/raw/branch/bachelor/data/stations.csv")

    print("Downloading stations.csv...")

    f.write(request.content)

Downloading stations.csv...


In [3]:
!ls

sample_data  stations.csv


In [4]:
with open("trips.csv", "wb") as f:
    request = requests.get("https://git.ai.ssau.ru/tk/big_data/raw/branch/bachelor/data/trips.csv")

    print("Downloading trips.csv...")

    f.write(request.content)

Downloading trips.csv...


# Hodl...

In [5]:
!ls

sample_data  stations.csv  trips.csv


# Initialization

In [6]:
from pyspark import SparkContext, SparkConf
from typing import NamedTuple
from datetime import datetime

In [7]:
conf = SparkConf().setAppName("Introduction to Apache Spark")

sc = SparkContext(conf=conf)

In [8]:
tripData = sc.textFile("/content/trips.csv")
# запомним заголовок, чтобы затем его исключить из данных
tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader)

# у первой строчки нету start_date
firstRow = trips.first()
trips = trips.filter(lambda row: row != firstRow)

# у второй строчки нету duration
secondRow = trips.first()
trips = trips.filter(lambda row: row != secondRow)

stationData = sc.textFile("/content/stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(","))

In [9]:
stationData.take(5)

['id,name,lat,long,dock_count,city,installation_date',
 '2,San Jose Diridon Caltrain Station,37.329732,-121.90178200000001,27,San Jose,8/6/2013',
 '3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013',
 '4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013',
 '5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013']

In [10]:
stations.take(5)

[['2',
  'San Jose Diridon Caltrain Station',
  '37.329732',
  '-121.90178200000001',
  '27',
  'San Jose',
  '8/6/2013'],
 ['3',
  'San Jose Civic Center',
  '37.330698',
  '-121.888979',
  '15',
  'San Jose',
  '8/5/2013'],
 ['4',
  'Santa Clara at Almaden',
  '37.333988',
  '-121.894902',
  '11',
  'San Jose',
  '8/6/2013'],
 ['5',
  'Adobe on Almaden',
  '37.331415',
  '-121.8932',
  '19',
  'San Jose',
  '8/5/2013'],
 ['6',
  'San Pedro Square',
  '37.336721000000004',
  '-121.894074',
  '15',
  'San Jose',
  '8/7/2013']]

In [11]:
tripData.take(5)

['id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code',
 '4576,63,,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127',
 '4607,,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138',
 '4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214',
 '4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060']

In [12]:
# Я отфильтровал первые 2 строчки в csv файле, потому что у них
# не определены start_date и duration
# и из-за этого у меня падает инициализация
trips.take(5)

['4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214',
 '4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060',
 '4299,83,8/29/2013 12:02,South Van Ness at Market,66,8/29/2013 12:04,Market at 10th,67,319,Subscriber,94103',
 '4927,103,8/29/2013 18:54,Golden Gate at Polk,59,8/29/2013 18:56,Golden Gate at Polk,59,527,Subscriber,94109',
 '4500,109,8/29/2013 13:25,Santa Clara at Almaden,4,8/29/2013 13:27,Adobe on Almaden,5,679,Subscriber,95112']

In [13]:
trips = trips.map(lambda row: row.split(","))

In [14]:
trips.take(5)

[['4130',
  '71',
  '8/29/2013 10:16',
  'Mountain View City Hall',
  '27',
  '8/29/2013 10:17',
  'Mountain View City Hall',
  '27',
  '48',
  'Subscriber',
  '97214'],
 ['4251',
  '77',
  '8/29/2013 11:29',
  'San Jose City Hall',
  '10',
  '8/29/2013 11:30',
  'San Jose City Hall',
  '10',
  '26',
  'Subscriber',
  '95060'],
 ['4299',
  '83',
  '8/29/2013 12:02',
  'South Van Ness at Market',
  '66',
  '8/29/2013 12:04',
  'Market at 10th',
  '67',
  '319',
  'Subscriber',
  '94103'],
 ['4927',
  '103',
  '8/29/2013 18:54',
  'Golden Gate at Polk',
  '59',
  '8/29/2013 18:56',
  'Golden Gate at Polk',
  '59',
  '527',
  'Subscriber',
  '94109'],
 ['4500',
  '109',
  '8/29/2013 13:25',
  'Santa Clara at Almaden',
  '4',
  '8/29/2013 13:27',
  'Adobe on Almaden',
  '5',
  '679',
  'Subscriber',
  '95112']]

In [22]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
        )

In [16]:
stationsInternal = stations.mapPartitions(initStation)
tripsInternal = trips.mapPartitions(initTrip)

In [17]:
stationsInternal.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [18]:
tripsInternal.first()

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

# 1. Найти велосипед с максимальным временем пробега.

In [19]:
bike_with_longest_duration = tripsInternal.keyBy(lambda trip: trip.bike_id) \
.mapValues(lambda trip: trip.duration) \
.reduceByKey(lambda firstDuration, secondDuration: firstDuration + secondDuration) \
.sortBy(lambda trip: trip[1], ascending=False) \
.first()

print("Bike id is " + str(bike_with_longest_duration[0]) + ' and maximum travel time is ' + str(bike_with_longest_duration[1]))

Bike id is 535 and maximum travel time is 18611693


# 2. Найти наибольшее геодезическое расстояние между станциями.



In [26]:
def distance(a, b):
  dist1 = (a.lat ** 2 - b.lat ** 2) + (a.long ** 2 - b.long ** 2)
  dist2 = (b.lat ** 2 - a.lat ** 2) + (b.long ** 2 - a.long ** 2)

  return dist1 ** 0.5 if dist1 >= 0 else dist2 ** 0.5

result = stationsInternal.cartesian(stationsInternal) \
.map(lambda pair: (pair[0].name, pair[1].name, distance(pair[0], pair[1]))) \
.sortBy(lambda station: station[2], ascending=False) \
.first()

print(f"From station {result[0]} to station {result[1]} the distance is: {result[2]}")

From station SJSU - San Salvador at 9th to station Golden Gate at Polk the distance is: 12.877539087729783


# 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [None]:
paths = tripsInternal.filter(lambda trip: trip.bike_id == bike_with_longest_duration[0]) \
  .sortBy(lambda trip: trip.start_date) \
  .take(10)

for path in paths:
  print(f"From station: {path.start_station_name} to station: {path.end_station_name}")

From station: Post at Kearney to station: San Francisco Caltrain (Townsend at 4th)
From station: San Francisco Caltrain (Townsend at 4th) to station: San Francisco Caltrain 2 (330 Townsend)
From station: San Francisco Caltrain 2 (330 Townsend) to station: Market at Sansome
From station: Market at Sansome to station: 2nd at South Park
From station: 2nd at Townsend to station: Davis at Jackson
From station: San Francisco City Hall to station: Civic Center BART (7th at Market)
From station: Civic Center BART (7th at Market) to station: Post at Kearney
From station: Post at Kearney to station: Embarcadero at Sansome
From station: Embarcadero at Sansome to station: Washington at Kearney
From station: Washington at Kearney to station: Market at Sansome


# 4. Найти количество велосипедов в системе.

In [None]:
bikes_count = tripsInternal.map(lambda trip: trip.bike_id) \
.distinct() \
.count()

print(bikes_count)

700


# 5. Найти пользователей потративших на поездки более 3 часов.

In [None]:
# В датасете нету как такового идентификатора пользователя
# поэтому я вывел их почтовой индекс, так как bike_id общий
subscribers = tripsInternal.keyBy(lambda trip: trip.zip_code) \
  .mapValues(lambda trip: trip.duration) \
  .reduceByKey(lambda firstDuration, secondDuration: firstDuration + secondDuration) \
  .filter(lambda trip: trip[1] > 3 * 60 * 60 and trip[0] != 'nil' and trip[0] != "") \
  .take(10)

for zip_code_of_subscriber in subscribers:
  print(zip_code_of_subscriber)

('95060', 758576)
('94109', 12057128)
('94061', 3049397)
('94612', 1860796)
('95138', 155295)
('94123', 1895963)
('94133', 21637675)
('94960', 1439873)
('94131', 3143302)
('1719', 24561)
