# 데이터 전송

데이터가 json 형태로 전송되고있다고 가정 (01_data_making 에서 진행)

스트리밍으로 전송된 데이터는 Kinesis 스트림에서 그리고 kinesis 파이어호스로 전송

파이어호스에서 json형식을 parquet형식으로 변환 후 s3에 저장한다.

# 키네시스 스트림 생성 및 삭제

In [2]:
import threading, sys, pandas, json, time
from boto import kinesis

sys.path.append("../lib") # 기준은 현재 커서의 위치
import sample_kinesis_wordputter as kinesis_kustom # kinesis stream 관련 오픈소스 코드

# 키네시스 이름
stream_name = 'datastream_1'
# 지역 설정
conn = kinesis.connect_to_region(region_name = 'ap-northeast-2')

def put_rows_in_kinesis(w): # w : 전송할 json형 데이터
    try: 
        conn.put_record(stream_name, w,"partitionkey") # 3번째 매개변수를 이것처럼 수정해야됨.#
        print("전송한 데이터: " + stream_name)
    except Exception as e:
        sys.stderr.write("예외: "
                         + w + " Stream 이름: " + stream_name + " 원인: " + str(e))

# 키네시스 스트림 생성.
def start_kinesis():
    try:
        status = kinesis_kustom.get_stream_status(conn, stream_name)
        if 'DELETING' == status:
            print('Stream 이름 {s} 은 삭제 중..'.format(s=stream_name))
            sys.exit(1)
        elif 'ACTIVE' != status:
            kinesis_kustom.wait_for_stream(conn, stream_name)
    except:
        conn.create_stream(stream_name, 1)
        kinesis_kustom.wait_for_stream(conn, stream_name)

start_kinesis() 

In [50]:
# 아래 코드로도 간단하게 kinesis 생성 가능
!aws kinesis create-stream --stream-name=datastream_1 --shard-count=1


An error occurred (ResourceInUseException) when calling the CreateStream operation: Stream datastream_1 under account 848045215644 already exists.


In [1]:
!aws kinesis delete-stream --stream-name datastream_1


An error occurred (ResourceNotFoundException) when calling the DeleteStream operation: Stream datastream_1 under account 848045215644 not found.


# 파이어호스 생성 및 삭제

In [64]:
# https://github.com/aws/aws-cli/issues/2528 참고한 사이트
!aws firehose create-delivery-stream --region=ap-northeast-2 --cli-input-json file://../data/firehose_cli_parameter.json

{
    "DeliveryStreamARN": "arn:aws:firehose:ap-northeast-2:848045215644:deliverystream/test_jang"
}


In [65]:
!aws firehose delete-delivery-stream --region=ap-northeast-2 --delivery-stream-name test_jang

# 전송 할 데이터 읽기


In [3]:
# utils는 json 또는 jsonl 파일을 읽고 쓰는 유틸리티 함수
sys.path.append("../lib")
import utils

rawdata_type_jsonl = utils.read_json_lines_file("../data/Raw_Data.jsonl")

진행도: 200000


# 데이터 키네시스 스트림으로 전송

In [10]:
# 키네시스로 데이터 전송(10개만..)
break_num = 1
for n in rawdata_type_jsonl:    
    put_rows_in_kinesis(json.dumps(n))

    print(n,'\n')
    if break_num > 10:
        break
    break_num+=1
    
    time.sleep(1)

전송한 데이터: datastream_1
{'Year': '2015', 'Quarter': '1', 'Month': '1', 'DayofMonth': '1', 'DayOfWeek': '4', 'FlightDate': '2015-01-01', 'Carrier': 'AA', 'TailNum': 'N001AA', 'FlightNum': '1519', 'Origin': 'DFW', 'OriginCityName': 'Dallas/Fort Worth, TX', 'OriginState': 'TX', 'Dest': 'MEM', 'DestCityName': 'Memphis, TN', 'DestState': 'TN', 'DepTime': '1342', 'DepDelay': -3.0, 'DepDelayMinutes': 0, 'TaxiOut': 16.0, 'TaxiIn': 7.0, 'WheelsOff': '1358', 'WheelsOn': '1457', 'ArrTime': '1504', 'ArrDelay': -6.0, 'ArrDelayMinutes': 0.0, 'Cancelled': 0, 'Diverted': 0, 'ActualElapsedTime': 82.0, 'AirTime': 59.0, 'Flights': 1, 'Distance': 432.0, 'CRSDepTime': '1345', 'CRSArrTime': '1510'} 

전송한 데이터: datastream_1
{'Year': '2015', 'Quarter': '1', 'Month': '1', 'DayofMonth': '1', 'DayOfWeek': '4', 'FlightDate': '2015-01-01', 'Carrier': 'AA', 'TailNum': 'N001AA', 'FlightNum': '1519', 'Origin': 'MEM', 'OriginCityName': 'Memphis, TN', 'OriginState': 'TN', 'Dest': 'DFW', 'DestCityName': 'Dallas/Fort Worth,

# 키네시스 스트림에 전송된 데이터 확인

In [5]:
# 키네시스 스트림에 데이터가 전송 되었는지 확인 (python3 콘솔에 복붙해서 실행)
import json
from boto import kinesis
import time

kinesis = kinesis.connect_to_region('ap-northeast-2')
shard_it = kinesis.get_shard_iterator('datastream_1', 'shardId-000000000000', 'LATEST')['ShardIterator']

while True:
    out = kinesis.get_records(shard_it, limit=2)
    print(out['Records'])
    shard_it = out['NextShardIterator']
    time.sleep(5)

[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]


KeyboardInterrupt: 

# s3에 저장된 데이터 확인

우선 키네시스 파이어호스를 생성해야 된다. 

생성은 웹 UI로 했다.

In [11]:
# s3에서 firehose를 통해 parquet로 저장된 데이터 가져오기
#!aws s3 cp s3://jhw620/2019/07/11/02 ../data/ --recursive
!aws s3 cp s3://jhw620/2022/07/11/08/ ../data/ --recursive

Completed 6.2 KiB/6.2 KiB (121.7 KiB/s) with 1 file(s) remainingdownload: s3://jhw620/2022/07/11/08/KDS-S3-0PXzw-2-2022-07-11-08-22-06-2dfea19f-9798-46a4-b2d1-1624eee5452f.parquet to ../data/KDS-S3-0PXzw-2-2022-07-11-08-22-06-2dfea19f-9798-46a4-b2d1-1624eee5452f.parquet


In [8]:
# 옛날 꺼
try:
  sc and spark
except NameError as e:
  import findspark
  findspark.init()
  import pyspark
  import pyspark.sql

  sc = pyspark.SparkContext()
  spark = pyspark.sql.SparkSession(sc).builder.getOrCreate()

# 저장된 파퀘이 데이터를 읽어온다.
on_time_dataframe = spark.read.parquet('../data/firehose_stream-1-2019-07-11-02-10-46-4139e534-1f12-4fe3-8f55-c9b9df8a31ec.parquet')
on_time_dataframe.show(3)


+----+-------+-----+----------+---------+----------+-------+-------+---------+------+--------------------+-----------+----+--------------------+---------+-------+--------+---------------+-------+------+---------+--------+-------+--------+---------------+---------+--------+-----------------+-------+-------+--------+----------+----------+------------+------------+--------+-------------+-----------------+
|year|quarter|month|dayofmonth|dayofweek|flightdate|carrier|tailnum|flightnum|origin|      origincityname|originstate|dest|        destcityname|deststate|deptime|depdelay|depdelayminutes|taxiout|taxiin|wheelsoff|wheelson|arrtime|arrdelay|arrdelayminutes|cancelled|diverted|actualelapsedtime|airtime|flights|distance|crsdeptime|crsarrtime|carrierdelay|weatherdelay|nasdelay|securitydelay|lateaircraftdelay|
+----+-------+-----+----------+---------+----------+-------+-------+---------+------+--------------------+-----------+----+--------------------+---------+-------+--------+---------------+-

In [1]:
# 최근(220711) 에 실행한거
try:
  sc and spark
except NameError as e:
  import findspark
  findspark.init()
  import pyspark
  import pyspark.sql

  sc = pyspark.SparkContext()
  spark = pyspark.sql.SparkSession(sc).builder.getOrCreate()

# 저장된 파퀘이 데이터를 읽어온다.
on_time_dataframe = spark.read.parquet('../data/KDS-S3-0PXzw-2-2022-07-11-08-22-06-2dfea19f-9798-46a4-b2d1-1624eee5452f.parquet')
on_time_dataframe.show(3)


+----+-------+-----+----------+---------+----------+-------+-------+---------+------+--------------------+-----------+----+--------------------+---------+-------+--------+---------------+-------+------+---------+--------+-------+--------+---------------+---------+--------+-----------------+-------+-------+--------+----------+----------+------------+------------+--------+-------------+-----------------+
|year|quarter|month|dayofmonth|dayofweek|flightdate|carrier|tailnum|flightnum|origin|      origincityname|originstate|dest|        destcityname|deststate|deptime|depdelay|depdelayminutes|taxiout|taxiin|wheelsoff|wheelson|arrtime|arrdelay|arrdelayminutes|cancelled|diverted|actualelapsedtime|airtime|flights|distance|crsdeptime|crsarrtime|carrierdelay|weatherdelay|nasdelay|securitydelay|lateaircraftdelay|
+----+-------+-----+----------+---------+----------+-------+-------+---------+------+--------------------+-----------+----+--------------------+---------+-------+--------+---------------+-

In [2]:
on_time_dataframe.limit(15).toPandas()

Unnamed: 0,year,quarter,month,dayofmonth,dayofweek,flightdate,carrier,tailnum,flightnum,origin,...,airtime,flights,distance,crsdeptime,crsarrtime,carrierdelay,weatherdelay,nasdelay,securitydelay,lateaircraftdelay
0,2015,1,1,1,4,2015-01-01,AA,N001AA,1519,DFW,...,59.0,1,432.0,1345,1510,,,,,
1,2015,1,1,1,4,2015-01-01,AA,N001AA,1519,MEM,...,77.0,1,432.0,1550,1730,,,,,
2,2015,1,1,1,4,2015-01-01,AA,N002AA,2349,ORD,...,129.0,1,802.0,1845,2115,0.0,0.0,26.0,0.0,0.0
3,2015,1,1,1,4,2015-01-01,AA,N003AA,1298,DFW,...,93.0,1,731.0,1820,2120,19.0,0.0,12.0,0.0,81.0
4,2015,1,1,1,4,2015-01-01,AA,N003AA,1422,DFW,...,111.0,1,769.0,800,925,78.0,0.0,0.0,0.0,0.0
5,2015,1,1,1,4,2015-01-01,AA,N003AA,1422,HDN,...,108.0,1,769.0,1005,1320,254.0,0.0,4.0,0.0,78.0
6,2015,1,1,1,4,2015-01-01,AA,N004AA,2287,JAC,...,146.0,1,1047.0,800,1200,0.0,0.0,21.0,0.0,0.0
7,2015,1,1,1,4,2015-01-01,AA,N005AA,1080,EGE,...,,1,1007.0,1415,1755,,,,,
8,2015,1,1,1,4,2015-01-01,AA,N005AA,1080,ORD,...,,1,1007.0,1145,1335,,,,,
9,2015,1,1,1,4,2015-01-01,AA,N005AA,2332,DFW,...,,1,802.0,740,955,,,,,
