In [5]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql as sql
conf = SparkConf().setAppName("lab1_San_Francisco").setMaster('local')
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [7]:
tripData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("data/trip.csv")

tripData

DataFrame[id: int, duration: int, start_date: timestamp, start_station_name: string, start_station_id: int, end_date: timestamp, end_station_name: string, end_station_id: int, bike_id: int, subscription_type: string, zip_code: string]

In [8]:
stationData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y')\
.csv("data/station.csv")

stationData

DataFrame[id: int, name: string, lat: double, long: double, dock_count: int, city: string, installation_date: timestamp]

In [9]:
tripData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [10]:
stationData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)



In [95]:
from typing import NamedTuple
from datetime import datetime
from __future__ import division

In [29]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

In [34]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            ) 
        except:
            pass

In [35]:
tripsData = sc.textFile("trip.csv")
stationsData = sc.textFile("station.csv")
tripsHeader = tripsData.first()
stationsHeader = stationsData.first()

In [36]:
trips = tripsData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))
stations = stationsData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))

In [33]:
tripsInternal = trips.mapPartitions(initTrip)
stationsInternal = stations.mapPartitions(initStation)

In [16]:
stationData.createOrReplaceTempView("stations")
tripData.createOrReplaceTempView("trips")

In [40]:
spark.sql("""
    SELECT bike_id
    FROM trips
    GROUP BY trips.bike_id
    ORDER BY sum(duration) DESC
    LIMIT 1
""").show()

+-------+
|bike_id|
+-------+
|    535|
+-------+



In [53]:
# 2 способ

In [128]:
bike_max_duration = tripsInternal.map(lambda trip: (trip.bike_id, trip.duration)).reduceByKey(lambda trip1, trip2: trip1 + trip2).top(1, key = lambda trip: trip[1])[0][0]

In [129]:
bike_max_duration

535

In [122]:
from math import sin, cos, sqrt, atan2, radians, acos
R = 6371
def getDistance(lat1, long1, lat2, long2):
    lat1_rad = radians(lat1)
    lat2_rad = radians(lat2)
    long1_rad = radians(long1)
    long2_rad = radians(long2)
    d = sin(lat1_rad)*sin(lat2_rad)+cos(lat1_rad)*cos(lat2_rad)*cos(long1_rad-long2_rad)
    if -1 <= d <= 1:
        return R * acos(d)
    else:
        return 0

In [127]:
crossed_stations = stationData.crossJoin(stationData)
crossed_stations.rdd.map(lambda x: getDistance(x[2], x[3], x[9], x[10])).max()

69.92087595421542

In [170]:
stations = tripsInternal.filter(lambda x: x.bike_id == bike_max_duration).sortBy(lambda x: x.start_date).map(lambda x: (x.start_station_name, x.end_station_name))

In [171]:
stations.top(100)

[('Yerba Buena Center of the Arts (3rd @ Howard)', 'Townsend at 7th'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)', 'Townsend at 7th'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)', 'Steuart at Market'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)', 'Spear at Folsom'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)', 'Spear at Folsom'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)', 'South Van Ness at Market'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)',
  'San Francisco Caltrain 2 (330 Townsend)'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)',
  'San Francisco Caltrain 2 (330 Townsend)'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)',
  'San Francisco Caltrain (Townsend at 4th)'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)',
  'San Francisco Caltrain (Townsend at 4th)'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)',
  'San Francisco Caltrain (Townsend at 4th)'),
 ('Yerba Buena Center of the Arts (3rd @ Howard)', 'Market at Sansome'),
 (

In [175]:
#duration
tripsInternal.filter(lambda x: x.bike_id == bike_max_duration).sortBy(lambda x: x.start_date).map(lambda x: (x.bike_id, x.duration)).reduceByKey(lambda trip1, trip2: trip1 + trip2).top(1, key = lambda trip: trip[1])[0][1]

18335738

In [174]:
tripsInternal.map(lambda trip: trip.bike_id).distinct().count()

700

Найти пользователей потративших на поездки более 3 часов

In [205]:
spark.sql("""
        SELECT bike_id
        FROM trips
        GROUP BY bike_id
        HAVING sum(duration) > 180 * 60
""").show()

+-------+
|bike_id|
+-------+
|    471|
|    496|
|    148|
|    463|
|    540|
|    392|
|    623|
|    243|
|    516|
|     31|
|    580|
|    137|
|    251|
|    451|
|     85|
|    458|
|     65|
|    588|
|    255|
|     53|
+-------+
only showing top 20 rows



In [208]:
spark.sql("""
        SELECT count(*)
        FROM (
        SELECT bike_id
        FROM trips
        GROUP BY bike_id
        HAVING sum(duration) > 180 * 60)
""").show()

+--------+
|count(1)|
+--------+
|     699|
+--------+



In [215]:
sc.stop()