# Load Taxi Trips Data from S3 bucket into Kinesis stream.


### Create an in-memory table for incoming data from S3 bucket
#### If you have not uploaded the Taxi trips data file to s3 bucket, please do that before executing the next step.


## IMPORTANT ⚠️ Make sure to update the 'path' variable to point to the file in the s3 bucket
(Can be found in Outputs Section of CloudFormation Template)
example: `s3://nyctaxitripsbucket***/taxi-trips.csv`

In [2]:

%flink.ssql(type=update)
DROP TABLE IF EXISTS nyc_yellow_taxi_trip_data;

CREATE TABLE nyc_yellow_taxi_trip_data(
id VARCHAR,
vendor_id VARCHAR,
pickup_datetime VARCHAR,
dropoff_datetime VARCHAR,
passenger_count VARCHAR,
pickup_longtitude VARCHAR,
dropoff_longtitude VARCHAR,
pickup_latitude VARCHAR,
dropoff_latitude VARCHAR,
store_and_fwd_flag VARCHAR(1),
gc_distance VARCHAR,
trip_duration VARCHAR,
google_distance VARCHAR,
google_duration VARCHAR
)
WITH (
'connector' = 'filesystem',
'path' = 's3://<YOUR_BUCKET>/taxi-trips.csv',
'format' = 'csv'   
)

### Query data from S3 

In [4]:
%flink.ssql(type=update)
select * from nyc_yellow_taxi_trip_data limit 5

### Create an in-memory table for target kinesis stream
### Make sure 'stream' and 'region' are correct according to your resources for the workshop

In [6]:

%flink.ssql(type=update)
DROP TABLE IF EXISTS nyc_yellow_taxi_trip_data_stream;
CREATE TABLE nyc_yellow_taxi_trip_data_stream (
id VARCHAR,
vendorId VARCHAR,
pickupDate VARCHAR,
dropoffDate VARCHAR,
passengerCount VARCHAR,
pickupLongitude VARCHAR,
dropoffLongitude VARCHAR,
pickupLatitude VARCHAR,
dropoffLatitude VARCHAR,
storeAndFwdFlag VARCHAR(1),
gcDistance VARCHAR,
tripDuration VARCHAR,
googleDistance VARCHAR,
googleDuration VARCHAR
)
WITH (
'connector' = 'kinesis',
'stream' = '<YOUR-STREAM>',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json');



 
### Start data insertion from S3 into Kinesis Stream using in-memory tables

In [8]:
%flink.ssql(type=update)
INSERT INTO nyc_yellow_taxi_trip_data_stream
SELECT id,
vendor_id as vendorId,
pickup_datetime as pickupDate,
dropoff_datetime as dropoffDate,
passenger_count as passengerCount,
pickup_longtitude as pickupLongitude,
dropoff_longtitude as dropoffLongitude,
pickup_latitude as pickupLatitude,
dropoff_latitude as dropoffLatitude,
store_and_fwd_flag as storeAndFwdFlag,
gc_distance as gcDistance,
trip_duration as tripDuration,
google_distance as googleDistance,
google_duration as googleDuration
FROM nyc_yellow_taxi_trip_data


### Query Kinesis stream target table to check data is inserted correctly.


In [10]:
%flink.ssql(type=update)
select * from nyc_yellow_taxi_trip_data_stream


### &#x23F8; &nbsp;   You can Stop the above paragraph when you have seen results.


### Steps to delete in-memory tables for cleanup purposes.

In [13]:
%flink.ssql(type=update)
drop table nyc_yellow_taxi_trip_data
    

In [14]:
%flink.ssql(type=update)
drop table nyc_yellow_taxi_trip_data_stream

In [15]:
%flink.ssql


In [16]:
%flink.ssql
