In [1]:
from airflow_practice.config import settings
from pyspark.sql.session import SparkSession

In [2]:
# Generate a spark session
spark = SparkSession.builder.getOrCreate()
# Set the spark session time zone of the timestamp column to utc
spark.conf.set("spark.sql.session.timeZone", 'UTC')

24/12/18 19:20:25 WARN Utils: Your hostname, LinnoMacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.12 instead (on interface en0)
24/12/18 19:20:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/18 19:20:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


`rdd` stans for `Resilient Distributed Dataset`

In [6]:
# 指定したフォルダー配下の全てのファイルと読み込む
path = str(settings.data_path/".raw"/"*")
rdd = spark.sparkContext.textFile(path)


In [7]:
rdd.take(2)


['23583 20200101 0005 20191231 1505  2.514 -158.61   59.28   -17.5     0.0     30 0   -21.8 C 0    60 0 -99.000 -9999.0  1015 0   1.59 0',
 '23583 20200101 0010 20191231 1510  2.514 -158.61   59.28   -17.5     0.0     35 0   -22.1 C 0    60 0 -99.000 -9999.0  1015 0   1.35 0']

In [8]:
rdd.count()

210816

`map` stands for `MapReduce`


In [10]:
from datetime import datetime, timezone
from pyspark.sql import Row


In [33]:
def parse_line(line: str): 
    """A function to prase each singlge line read from the raw data."""

    f = line.split()
    # wbanno? 
    wbanno = f[0]

    # Generate data-time string, converting to utc time
    dt = datetime.strptime(f[1]+f[2], "%Y%m%d%H%M")
    dt = dt.replace(tzinfo=timezone.utc)
    
    # Replace the outlier of temperature (-9999.9)
    temperature  = None if f[8] == '-9999.0' else float(f[8])

    # Parse a string line into a Row object
    return Row(timestamp=dt,wbanno=wbanno, temperature=temperature )

In [34]:
# Use the map method to apply the parse line function on each line
# Get a series of Row objects
rows: list[Row] = rdd.map(parse_line)

In [35]:
rows.take(2)


[Row(timestamp=datetime.datetime(2020, 1, 1, 0, 5, tzinfo=datetime.timezone.utc), wbanno='23583', temperature=-17.5),
 Row(timestamp=datetime.datetime(2020, 1, 1, 0, 10, tzinfo=datetime.timezone.utc), wbanno='23583', temperature=-17.5)]

In [36]:
# Create df using rdd
df = rdd.map(parse_line).toDF()
df


DataFrame[timestamp: timestamp, wbanno: string, temperature: double]

In [37]:
# Show the upper 2 lines: 
df.show(2)


[Stage 15:>                                                         (0 + 1) / 1]

+-------------------+------+-----------+
|          timestamp|wbanno|temperature|
+-------------------+------+-----------+
|2020-01-01 00:05:00| 23583|      -17.5|
|2020-01-01 00:10:00| 23583|      -17.5|
+-------------------+------+-----------+
only showing top 2 rows



24/12/18 20:00:17 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 15 (TID 16): Attempting to kill Python Worker
                                                                                

In [38]:
# Show the aggressive information
df.describe().show()



[Stage 16:>                                                         (0 + 2) / 2]

+-------+-----------------+------------------+
|summary|           wbanno|       temperature|
+-------+-----------------+------------------+
|  count|           210816|            207720|
|   mean|          59994.0|2.7866825534372963|
| stddev|36411.08635760458| 9.831826313535235|
|    min|            23583|             -32.0|
|    max|            96405|              26.6|
+-------+-----------------+------------------+



                                                                                

Use SQL 

In [39]:
# Create a temp view
df.createOrReplaceTempView('uscrn')

In [40]:
# Use SQL query
query = '''
SELECT
  wbanno,
  min_by(timestamp, temperature) timestamp_min,
  min(temperature) t_min,
  max_by(timestamp, temperature) timestamp_max,
  max(temperature) t_max
FROM
  uscrn
GROUP by
  1
'''
spark.sql(query).show()

[Stage 19:>                                                         (0 + 2) / 2]

+------+-------------------+-----+-------------------+-----+
|wbanno|      timestamp_min|t_min|      timestamp_max|t_max|
+------+-------------------+-----+-------------------+-----+
| 23583|2020-02-01 16:15:00|-32.0|2020-08-17 00:20:00| 24.8|
| 96405|2020-01-10 17:30:00|-25.6|2020-07-03 22:55:00| 26.6|
+------+-------------------+-----+-------------------+-----+



                                                                                