In [7]:
import ibmos2spark
import pyspark
from pyspark.sql import types as tp

# @hidden_cell
credentials = {
    'endpoint': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'api_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'service_id': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token',
    'bucket':'pilotbigdataprojectxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
}

configuration_name = 'os_bf5780348bf84067aa71ba938d56f9a0_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

#change this to True to use a smaller file
testRunInd=0 #run for whole training data 2016-2017
#testRunInd=1 #run 2017Q4 smaller file
#testRunInd=2 #run for test data 2018/01 and 2018/02

stationId=''
#stationId='83,64,288'


if testRunInd==2:
    sourceTripFile='2018Q1_citibike-tripdata.csv.bz2'
    targetFile='TestData_2018Q1'
    dateRange=" '2018-01-01 00:00:00' and '2018-02-28 23:59:59' "


elif testRunInd==1:
    sourceTripFile='2017Q4_citibike-tripdata.csv.bz2'
    targetFile='TrainingData_2017Q4'
    dateRange=" '2017-10-01 00:00:00' and '2017-12-31 23:59:59' "

else:
    sourceTripFile='2016-2017_citibike-tripdata.csv.bz2'
    targetFile='TrainingData_2016-2017'
    dateRange=" '2016-01-01 00:00:00' and '2017-12-31 23:59:59' "

targetFile+='_Station-'+stationId.replace(',','-')+'.csv'
weatherDataPath=cos.url('WeatherData_1261418.csv', credentials['bucket'])
calendarDataPath=cos.url('calendar.csv',  credentials['bucket'])


spark = SparkSession.builder.appName("BikeDataIntermediateFile").getOrCreate()
#spark.config('SHUFFLE_PARTITIONS', 24)
#spark.conf.get("spark.sql.shuffle.partitions")


#SET spark.sql.shuffle.partitions=[num_tasks];
#dfTripData.take(5)

In [8]:
## Prep Trip Data

# Please read the documentation of PySpark to learn more about the possibilities to load data files.
# PySpark documentation: https://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession
# The SparkSession object is already initialized for you.
# The following variable contains the path to your file on your IBM Cloud Object Storage.
tripDataPath = cos.url(sourceTripFile,  credentials['bucket'])

bikeSchema = tp.StructType([
 tp.StructField("tripduration", tp.IntegerType() , True ),
 tp.StructField("starttime", tp.TimestampType() , True ),
 tp.StructField("stoptime", tp.TimestampType() , True ),
 tp.StructField("start station id", tp.StringType() , True ),
 tp.StructField("start station name", tp.StringType() , True ),
 tp.StructField("start station latitude", tp.DoubleType() , True ),
 tp.StructField("start station longitude", tp.DoubleType() , True ),
 tp.StructField("end station id", tp.StringType() , True ),
 tp.StructField("end station name", tp.StringType() , True ),
 tp.StructField("end station latitude", tp.StringType() , True ),
 tp.StructField("end station longitude", tp.StringType() , True ),
 tp.StructField("bikeid", tp.StringType() , True ),
 tp.StructField("usertype", tp.StringType() , True ),
 tp.StructField("birth year", tp.StringType() , True ),
 tp.StructField("gender", tp.StringType() , True )
]
)

dfTripData = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option("timestampFormat","MM/dd/yyyy HH:mm:ss")\
  .schema(bikeSchema)\
  .load(tripDataPath)



stationFilteEndr=''

if stationId!='':
    stationFilterEnd="""
   and `end station id` in (""" +stationId + """)
   """"

stationFilterStart=''
if stationId!='':
    stationFilterStart="""
   and `start station id` in (""" +stationId + """)
   """"
    
dfTripData.createOrReplaceTempView("tripdata")
q1 = """ Select Year*100+Month as YearMonth,
                StationID,
                Year,
                Month,
                Day,
                Hour,
                DateHour,
                InBike,
                OutBike
    from ( 
        SELECT
            cast(date_format(starttime, 'yyyy-MM-dd HH:00:00') as timestamp) as DateHour,
            cast(year(starttime) as smallint) as Year ,
            cast(month(starttime) as tinyint) as Month ,
            cast(day(starttime) as tinyint) as Day ,
            cast(hour(starttime) as tinyint) as Hour ,
            cast(`start station id` as smallint) as StationID,
            cast(0 as smallint) as InBike,
            cast(1 as smallint) as OutBike
    
        FROM
          tripdata m
        where starttime between """ +dateRange + stationFilterStart + """
        
    
    UNION ALL

        SELECT
            cast(date_format(stoptime, 'yyyy-MM-dd HH:00:00') as timestamp) as DateHour,
            cast(year(stoptime) as smallint) as Year ,
            cast(month(stoptime) as tinyint) as Month ,
            cast(day(stoptime) as tinyint) as Day ,
            cast(hour(stoptime) as tinyint) as Hour ,
            cast(`end station id` as smallint) as StationID,
            cast(1 as smallint) as InBike,
            cast(0 as smallint) as OutBike
    
        FROM
          tripdata m
        where stoptime between """ +dateRange + stationFilterEnd + """
        
    ) as TripDataTemp
    """

dfTripDataTemp= spark.sql(q1)
dfTripDataTemp.createOrReplaceTempView("TripDataTemp")

In [9]:
### Prep Calendar and Weather data

dfWeatherData = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option("timestampFormat","yyyy-MM-dd HH:mm")\
  .option("inferSchema", "true")\
  .option('header', 'true')\
  .load(weatherDataPath)

dfWeatherData.createOrReplaceTempView("WeatherData")
#dfWeatherData.printSchema()
    
qw = """
    Select distinct
        DateHour
        ,LAST_VALUE(HOURLYPRSENTWEATHERTYPE) over(partition by DateHour order by DateHour ) as HOURLYPRSENTWEATHERTYPE
        ,LAST_VALUE(HOURLYPRSENTWEATHERTYPE_IND) over(partition by DateHour order by DateHour ) as HOURLYPRSENTWEATHERTYPE_IND
        ,LAST_VALUE(HOURLYDRYBULBTEMPC) over(partition by DateHour order by DateHour ) as HOURLYDRYBULBTEMPC
        ,LAST_VALUE(HOURLYWindSpeed) over(partition by DateHour order by DateHour ) as HOURLYWindSpeed
        ,LAST_VALUE(DAILYSnowDepth) over(partition by DateHour order by DateHour ) as DAILYSnowDepth
        ,LAST_VALUE(HOURLYPrecipT) over(partition by DateHour order by DateHour ) as HOURLYPrecip
    from (
        Select cast(date_format(DATE, 'yyyy-MM-dd HH:00:00') as timestamp) as DateHour,
            coalesce(HOURLYPRSENTWEATHERTYPE,'') as HOURLYPRSENTWEATHERTYPE,
            IF( HOURLYPRSENTWEATHERTYPE is not null, 1, 0) as HOURLYPRSENTWEATHERTYPE_IND,
            coalesce(cast(regexp_replace(HOURLYDRYBULBTEMPC,'[^0-9.]', '') as double),0) as HOURLYDRYBULBTEMPC,
            coalesce(HOURLYWindSpeed,0) as HOURLYWindSpeed,
            coalesce(cast(regexp_replace(DAILYSnowDepth,'[^0-9.]', '') as double),0) as DAILYSnowDepth,
            coalesce(cast(regexp_replace(HOURLYPrecip,'[^0-9.]', '') as double),0) as HOURLYPrecipT
        from WeatherData
        where REPORTTPYE='FM-15'
        and DATE between """ +dateRange + """ 
        
    )t1
    

 """    
dfWeatherDataTemp = spark.sql(qw)
dfWeatherDataTemp.createOrReplaceTempView("WeatherDataTemp")
#dfWeatherDataTemp.printSchema()
#dfWeatherDataTemp.show()


calendarSchema = tp.StructType([
 tp.StructField("year", tp.IntegerType() , True ),
 tp.StructField("month", tp.IntegerType() , True ),
 tp.StructField("day", tp.IntegerType() , True ),
 tp.StructField("date", tp.DateType() , True ),
 tp.StructField("weekday#", tp.IntegerType() , True ),
 tp.StructField("business day", tp.IntegerType() , True ),
 tp.StructField("Holiday", tp.IntegerType() , True )
]
)

dfCalendar = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .schema(calendarSchema)\
  .option('header', 'true')\
  .load(calendarDataPath)

dfCalendar.createOrReplaceTempView("Calendar")
#dfCalendar.printSchema()

In [10]:
# Group the data for the final file
q  = """
select 
    bikeData.*,
    InBike - OutBike as Result,
    COALESCE(wc.HOURLYPRSENTWEATHERTYPE, LAST_VALUE(wc.HOURLYPRSENTWEATHERTYPE, TRUE) OVER(ORDER BY bikeData.DateHour ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) as HOURLYPRSENTWEATHERTYPE,
    COALESCE(wc.HOURLYPRSENTWEATHERTYPE_IND, LAST_VALUE(wc.HOURLYPRSENTWEATHERTYPE_IND, TRUE) OVER(ORDER BY bikeData.DateHour ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) as HOURLYPRSENTWEATHERTYPE_IND,
    COALESCE(wc.HOURLYDRYBULBTEMPC, LAST_VALUE(wc.HOURLYDRYBULBTEMPC, TRUE) OVER(ORDER BY bikeData.DateHour ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) as HOURLYDRYBULBTEMPC,
    COALESCE(wc.HOURLYWindSpeed, LAST_VALUE(wc.HOURLYWindSpeed, TRUE) OVER(ORDER BY bikeData.DateHour ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) as HOURLYWindSpeed,
    COALESCE(wc.DAILYSnowDepth, LAST_VALUE(wc.DAILYSnowDepth, TRUE) OVER(ORDER BY bikeData.DateHour ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) as DAILYSnowDepth,
    COALESCE(wc.HOURLYPrecip, LAST_VALUE(wc.HOURLYPrecip, TRUE) OVER(ORDER BY bikeData.DateHour ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) as HOURLYPrecip,
    c.`business day` as workingDay
    
    from (

    select 
            YearMonth,
            Hour,
            StationID,
            Year,
            Month,
            Day,
            DateHour,
            to_date(DateHour) as DATE,
            sum(InBike) as InBike,
            sum(OutBike) as OutBike

    from TripDataTemp as Final

    group by YearMonth,
        Hour,
        StationID,
        Year,
        Month,
        Day,
        DateHour
    order by YearMonth,
        Hour,
        StationID,
        Year,
        Month,
        Day,
        DateHour
    ) bikeData
    
    left join WeatherDataTemp wc
    on bikeData.DateHour = wc.DateHour
    
    left join Calendar c
    on bikeData.DATE=c.date
    
"""

dfTrainingData = spark.sql(q)
#dfTrainingData.printSchema()


In [11]:
# export the file as a single partition

savePath = cos.url(targetFile,  credentials['bucket'])
dfTrainingData.repartition(1).sortWithinPartitions("DateHour").write.csv(savePath, mode ='overwrite', header = True)

In [12]:
#rename the PART- files
import ibm_boto3
from botocore.client import Config

credentialsBoto = {
    'service_name':'s3',
    'endpoint_url': credentials['endpoint'],
    'ibm_api_key_id': credentials['api_key'],
    'config': Config(signature_version='oauth'),
    'ibm_auth_endpoint': credentials['iam_service_endpoint']}

resource = ibm_boto3.resource(**credentialsBoto)

client = ibm_boto3.client(**credentialsBoto)

#search for PART- the files
Prefix=targetFile+'/part-'
fileList = client.list_objects_v2(Bucket= credentials['bucket'],Prefix=Prefix)

i=0
#if found, run through the list and copy them with a new name and delete the old one.
try:
    for files in fileList["Contents"]:
        fileName = files["Key"]
        #print(fileName)

        if fileName.startswith(Prefix):
            i+=1
        
            if i==1:
                newFileName=targetFile
            else:
                newFileName=targetFile[:-4]+'%s.csv'%(i)

            #print(newFileName)
            responseSave = resource.Bucket(fileList["Name"]).Object(newFileName).copy_from(CopySource=fileList["Name"]+'/'+fileName)
            responseDelete = resource.Bucket(fileList["Name"]).Object(fileName).delete()
            #print(str(responseSave))
            #print(str(responseDelete))
except KeyError as err:
    print('File not found while renaming. Error: '+str(err))

#delete /_SUCCESS file
responseDelete = resource.Bucket( credentials['bucket']).Object(targetFile+'/_SUCCESS').delete()