In [1]:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

22/06/04 11:05:30 WARN Utils: Your hostname, primary resolves to a loopback address: 127.0.1.1; using 192.168.64.2 instead (on interface enp0s2)
22/06/04 11:05:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/06/04 11:05:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
rdd = spark.sparkContext.textFile("./raw/*")

In [3]:
rdd.take(2)

                                                                                

['26656 20200101 0005 20191231 1505      3 -164.08   61.35   -20.8     0.0     26 0   -21.6 C 0    80 0 -99.000 -9999.0  1005 0   6.69 0',
 '26656 20200101 0010 20191231 1510      3 -164.08   61.35   -20.8     0.0     24 0   -21.6 C 0    79 0 -99.000 -9999.0  1005 0   7.64 0']

In [4]:
rdd.count()

                                                                                

192360

In [5]:
from datetime import datetime,timezone
from pyspark.sql import Row

In [6]:
def parse_line(line):
    f = line.split()
    #観測所番号
    wbanno = f[0]
    
    dt = datetime.strptime(f[1] + f[2], "%Y%m%d%H%M")
    dt = dt.replace(tzinfo = timezone.utc)
    ##気温を実数に変換する
    
    temperature = None if f[8] == "-9999.0" else float(f[8])
    
    return Row(timestamp=dt, wbanno=wbanno, temperature=temperature)

In [7]:
rows = rdd.map(parse_line)
rows.take(2)

[Row(timestamp=datetime.datetime(2020, 1, 1, 0, 5, tzinfo=datetime.timezone.utc), wbanno='26656', temperature=-20.8),
 Row(timestamp=datetime.datetime(2020, 1, 1, 0, 10, tzinfo=datetime.timezone.utc), wbanno='26656', temperature=-20.8)]

In [8]:
#RDDからデータフレームを作成する
df = rdd.map(parse_line).toDF()
df

DataFrame[timestamp: timestamp, wbanno: string, temperature: double]

In [9]:
#タイムゾーンをUTCにセットする
spark.conf.set("spark.sql.session.timeZone","UTC")

In [10]:
df.show(2)

                                                                                

+-------------------+------+-----------+
|          timestamp|wbanno|temperature|
+-------------------+------+-----------+
|2020-01-01 00:05:00| 26656|      -20.8|
|2020-01-01 00:10:00| 26656|      -20.8|
+-------------------+------+-----------+
only showing top 2 rows



In [11]:
#データフレームの統計値を表示
df.describe().show()

                                                                                

+-------+------------------+------------------+
|summary|            wbanno|       temperature|
+-------+------------------+------------------+
|  count|            192360|            192156|
|   mean|           25119.5|1.4450451716313355|
| stddev|1536.5039938292543|11.572590880335754|
|    min|             23583|             -32.0|
|    max|             26656|              24.8|
+-------+------------------+------------------+



In [12]:
#データベースを一時的なビューとして登録
df.createOrReplaceTempView("uscrn")

In [13]:
#観測所ごとに最低気温と最高気温を集計

query = """SELECT wbanno,
min_by(timestamp,temperature) timestamp_min,
min(temperature) t_min,
max_by(timestamp, temperature) timestamp_max,
max(temperature) t_max 
from uscrn 
GROUP by 1"""

spark.sql(query).show()



+------+-------------------+-----+-------------------+-----+
|wbanno|      timestamp_min|t_min|      timestamp_max|t_max|
+------+-------------------+-----+-------------------+-----+
| 23583|2020-02-01 16:15:00|-32.0|2020-08-17 00:20:00| 24.8|
| 26656|2020-02-09 15:15:00|-30.8|2020-05-30 23:05:00| 23.3|
+------+-------------------+-----+-------------------+-----+



                                                                                

In [14]:
#列志向ストレージとして保存（デフォルトではParquet形式)
df.write.save("./uscrn-parquet")

AnalysisException: path file:/home/ubuntu/wdpressplus-bigdata/notebooks/uscrn-parquet already exists.;

In [None]:
!ls ./uscrn-parquet

In [None]:
#データフレームを読み込む
df = spark.read.load("./uscrn-parquet")

#観測所ごとに平均気温を計算
df.groupBy("wbanno").avg("temperature").show()

In [None]:
#3ヶ月のデータを抽出
df = spark.read.load("./uscrn-parquet")
df1 = df.where("timestamp >= '2020-01-01' AND timestamp < '2020-04-01'")

In [None]:
#レコード数雨を確認しておく
df1.count()

In [None]:
#CSV形式でファイルに書き出す（一つのファイルにまとめる）
df1.coalesce(1).write.save("./export", format="csv", header=True)

In [None]:
#出力ファイルを確認
!ls ./export

In [None]:
#デスクトップにコピー
!cp ./export/*.csv ~/Home/Desktop

In [None]:
!pip install pyarrow

In [None]:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.session.timeZone", "UTC")

In [None]:
#sparkのデータフレームとしてロードする
df = spark.read.load("./uscrn-parquet")

In [None]:
#sparkによる集計結果をpandasのデータフレームに変換
df1 = df.groupBy("timestamp").avg().toPandas()

In [None]:
df1.sort_values(by="avg(temperature)",accending=False).head(2)

In [None]:
df1

In [None]:
#Parquet形式のファイルを読み込み
import pandas as pd
df = pd.read_parquet("./uscrn-parquet")

In [None]:
#pandasでデータを集計
df1 = df.groupby("timestamp").mean()
df1.sort_values(by="temperature", ascending=False).head(2)