In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext

spark = SparkSession.builder \
.master("local[*]") \
.appName("sample_load_s3") \
.enableHiveSupport() \
.config("spark.submit.deployMode","client") \
.config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.hadoop.com.amazonaws.services.s3.enableV4","true") \
.config("spark.hadoop.fs.s3a.endpoint","https://s3.eu-central-1.amazonaws.com") \
.config("spark.hadoop.hadoop.security.credential.provider.path","jceks://hdfs/user/data_sci/igl_aws.jceks") \
.config("spark.jars","hdfs://hadoop-supercrunch-mvp/user/data_sci/spark-avro_2.11-4.0.0.jar") \
.getOrCreate()



In [20]:
rawWeatherData = spark.read.text("s3a://dimajix-training/data/weather/2003")

# https://console.aws.amazon.com/s3/buckets/dimajix-training/data/?region=eu-central-1&tab=overview

# s3://dimajix-training/data/weather
    
    
# s3a://algo-output/20170401999007/brand-equity-digicam-cn-24-20170401/part-00000-f7111240-59a6-4323-9ae9-fa774b09e182-c000.avro

In [21]:
from pyspark.sql.types import *
from pyspark.sql.functions import *


weatherData = rawWeatherData.select(
    substring(col("value"),5,6).alias("usaf"),
    substring(col("value"),11,5).alias("wban"),
    to_timestamp(substring(col("value"),16,12),"yyyyMMddHHmm").alias("timestamp"),
    to_timestamp(substring(col("value"),16,12),"yyyyMMddHHmm").cast("long").alias("ts"),
    substring(col("value"),42,5).alias("report_type"),
    substring(col("value"),61,3).alias("wind_direction"),
    substring(col("value"),64,1).alias("wind_direction_qual"),
    substring(col("value"),65,1).alias("wind_observation"),
    (substring(col("value"),66,4).cast("float") / lit(10.0)).alias("wind_speed"),
    substring(col("value"),70,1).alias("wind_speed_qual"),
    (substring(col("value"),88,5).cast("float") / lit(10.0)).alias("air_temperature"),
    substring(col("value"),93,1).alias("air_temperature_qual")
).show()


+------+-----+-------------------+----------+-----------+--------------+-------------------+----------------+----------+---------------+---------------+--------------------+
|  usaf| wban|          timestamp|        ts|report_type|wind_direction|wind_direction_qual|wind_observation|wind_speed|wind_speed_qual|air_temperature|air_temperature_qual|
+------+-----+-------------------+----------+-----------+--------------+-------------------+----------------+----------+---------------+---------------+--------------------+
|703160|25624|2003-01-01 00:00:00|1041379200|      SY-MT|           010|                  5|               N|       5.2|              5|           -0.6|                   5|
|703160|25624|2003-01-01 00:17:00|1041380220|      FM-16|           020|                  1|               N|       4.6|              1|           -2.0|                   1|
|703160|25624|2003-01-01 00:53:00|1041382380|      FM-15|           010|                  5|               N|       5.2|          