In [None]:
pip install python-dotenv

In [None]:
import os
from os.path import join
from dotenv import load_dotenv

import pyspark as ps
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.functions import col

In [None]:
ps_conf = ps.SparkConf()\
            .set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")\
            .set("mapreduce.fileoutputcommitter.marksuccessfuljobs","false")\
            .set("spark.sql.shuffle.partitions",200)
            # '_started'と'_committed_'で始まるファイルを書き込まないように設定
            # '_SUCCESS'で始まるファイルを書き込まないように設定
            # パーティション数を増やす
spark = SparkSession.builder.config(conf=ps_conf).getOrCreate()

In [None]:
load_dotenv(join(os.getcwd(), '.env'))

In [None]:
BASE_PATH     = '/mnt/adintedataexplorer'
SILVER_PATH   = '_ml-medallion/dev/test_silver/'
MITSUBISHI    = 'csv_data/mitsubishi_japan_motor_show.csv'
NAGAOKA_HNB   = 'csv_data/nagaoka_hanabi.csv'
AKA_RENGA     = 'csv_data/yokohama_aka_renga.csv'
path = BASE_PATH + SILVER_PATH + AKA_RENGA
df_csv = spark.read\
                .option('inferSchema', 'True')\
                .option('header', 'True')\
                .csv(path)

df_csv.display()
utid_list = sorted(df_csv.select("unit_id").drop_duplicates().rdd.flatMap(lambda x: x).collect())

In [None]:
START_DAY = '2019-07-26'
END_DAY   = '2019-11-05'
AIBEACON_PATH = 'adinte.aibeacon_wifi_log'

df_raw_data = spark.table(AIBEACON_PATH)\
                    .withColumn('date',     F.to_date(col('date')))\
                    .withColumn('datetime', F.to_timestamp(col('datetime')))\
                    .filter((col('date') >= START_DAY) & (col('date') <= END_DAY))\
                    .filter(col('randomized') == '1')\
                    .filter(col('unit_id').isin(utid_list))
df_raw_data.display()

In [None]:
BASE_PATH   = '/mnt/adintedataexplorer'
SILVER_PATH = '_ml-medallion/dev/test_silver/'
MI_PATH     = 'csv_data/beacon_for_mitsubishi.csv'
NA_PATH     = 'csv_data/beacon_for_nagaoka.csv'
AK_PATH     = 'csv_data/beacon_for_aka_renga.csv'
path = BASE_PATH + SILVER_PATH + MI_PATH
pd_raw_data = df_raw_data.toPandas()\
                .to_csv(path, index=False, header=True)