In [1]:
!pip3 install pyspark==3.0.0



In [2]:
from pyspark import SparkContext, SparkConf
from typing import NamedTuple
from datetime import datetime
import numpy as np

In [3]:
conf = SparkConf().setAppName("Lab1_Script")
sc = SparkContext(conf=conf)
sc

In [4]:
tripData = sc.textFile("trips.csv")
# запомним заголовок, чтобы затем его исключить из данных
tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))

stationData = sc.textFile("stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))

In [5]:
stationData.take(5)

['id,name,lat,long,dock_count,city,installation_date',
 '2,San Jose Diridon Caltrain Station,37.329732,-121.90178200000001,27,San Jose,8/6/2013',
 '3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013',
 '4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013',
 '5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013']

In [6]:
stations.take(5)

[['2',
  'San Jose Diridon Caltrain Station',
  '37.329732',
  '-121.90178200000001',
  '27',
  'San Jose',
  '8/6/2013'],
 ['3',
  'San Jose Civic Center',
  '37.330698',
  '-121.888979',
  '15',
  'San Jose',
  '8/5/2013'],
 ['4',
  'Santa Clara at Almaden',
  '37.333988',
  '-121.894902',
  '11',
  'San Jose',
  '8/6/2013'],
 ['5',
  'Adobe on Almaden',
  '37.331415',
  '-121.8932',
  '19',
  'San Jose',
  '8/5/2013'],
 ['6',
  'San Pedro Square',
  '37.336721000000004',
  '-121.894074',
  '15',
  'San Jose',
  '8/7/2013']]

In [7]:
tripData.take(5)

['id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code',
 '4576,63,,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127',
 '4607,,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138',
 '4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214',
 '4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060']

In [8]:
trips.take(5)

[['4576',
  '63',
  '',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127'],
 ['4607',
  '',
  '8/29/2013 14:42',
  'San Jose City Hall',
  '10',
  '8/29/2013 14:43',
  'San Jose City Hall',
  '10',
  '661',
  'Subscriber',
  '95138'],
 ['4130',
  '71',
  '8/29/2013 10:16',
  'Mountain View City Hall',
  '27',
  '8/29/2013 10:17',
  'Mountain View City Hall',
  '27',
  '48',
  'Subscriber',
  '97214'],
 ['4251',
  '77',
  '8/29/2013 11:29',
  'San Jose City Hall',
  '10',
  '8/29/2013 11:30',
  'San Jose City Hall',
  '10',
  '26',
  'Subscriber',
  '95060'],
 ['4299',
  '83',
  '8/29/2013 12:02',
  'South Van Ness at Market',
  '66',
  '8/29/2013 12:04',
  'Market at 10th',
  '67',
  '319',
  'Subscriber',
  '94103']]

In [9]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

In [10]:
stationsInternal = stations.mapPartitions(initStation)
tripsInternal = trips.mapPartitions(initTrip)

In [11]:
stationsInternal.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [12]:
tripsInternal.first()

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

# 1. Найти велосипед с максимальным временем пробега.

In [13]:
bike_with_longest_duration = tripsInternal.keyBy(lambda trip: trip.bike_id) \
  .mapValues(lambda trip: trip.duration) \
  .reduceByKey(lambda firstDuration, secondDuration: firstDuration + secondDuration) \
  .sortBy(lambda trip: trip[1], ascending=False) \
  .first()

print(f'Bike: {bike_with_longest_duration[0]} - time: {bike_with_longest_duration[1]}')

Bike: 535 - time: 18611693


# 2. Найти наибольшее геодезическое расстояние между станциями.



In [14]:
import math

def distance(a, b):
  #pi - число pi, rad - радиус сферы (Земли)
  rad = 6372

  # в радианах
  lat1  = a.lat  * math.pi / 180.
  lat2  = b.lat  * math.pi / 180.
  long1 = a.long * math.pi / 180.
  long2 = b.long * math.pi / 180.

  # косинусы и синусы широт и разницы долгот
  cl1 = math.cos(lat1)
  cl2 = math.cos(lat2)
  sl1 = math.sin(lat1)
  sl2 = math.sin(lat2)
  delta = long2 - long1
  cdelta = math.cos(delta)
  sdelta = math.sin(delta)

  # вычисления длины большого круга
  y = math.sqrt(math.pow(cl2 * sdelta, 2) + math.pow(cl1 * sl2 - sl1 * cl2 * cdelta,2))
  x = sl1 * sl2 + cl1 * cl2 * cdelta
  ad = math.atan2(y, x)
  dist = ad * rad
  return dist

res = stationsInternal.cartesian(stationsInternal) \
  .map(lambda pair: (pair[0].name, pair[1].name, distance(pair[0], pair[1]))) \
  .sortBy(lambda station: station[2], ascending=False) \
  .first()

res[2]

69.9318508210141

# 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [15]:
path = tripsInternal.filter(lambda trip: trip.bike_id == bike_with_longest_duration[0]) \
  .sortBy(lambda trip: trip.start_date) \
  .take(10)

for p in path:
  print(p.start_station_name)

Post at Kearney
San Francisco Caltrain (Townsend at 4th)
San Francisco Caltrain 2 (330 Townsend)
Market at Sansome
2nd at Townsend
San Francisco City Hall
Civic Center BART (7th at Market)
Post at Kearney
Embarcadero at Sansome
Washington at Kearney


# 4. Найти количество велосипедов в системе.

In [16]:
cycles = tripsInternal.map(lambda trip: trip.bike_id) \
  .distinct() \
  .count()

cycles

700

# 5. Найти пользователей потративших на поездки более 3 часов.

In [17]:
users = tripsInternal \
  .filter(lambda x: x.duration > (3 * 60 * 60))\
  .map(lambda x: x.zip_code)\
  .filter(lambda x: x != "")\
  .distinct()

users.take(10)

['58553',
 '94301',
 '94039',
 '94133',
 '93726',
 '94123',
 '4517',
 '29200',
 '45322',
 '94080']