<a href="https://colab.research.google.com/github/m-ni4da/main/blob/main/pyspark_%E5%95%8F%E9%A1%8C%E9%9B%86.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# pyspark: Apache SparkのPython APIであるPySparkをインストールします。
# findspark: PySparkをPythonのシステムパスに追加するためのユーティリティです。
# pyarrow: Apache ArrowのPythonバインディングです。PySparkと組み合わせて使用すると、データの高速な相互変換や並列処理が可能になります。
# xlrd: Excelファイルを読み取るためのPythonライブラリです。
# openpyxl: Excelファイルを読み書きするためのPythonライブラリです。
# chardet: 文字エンコーディングを検出するためのPythonライブラリです。
!pip install pyspark
!pip install findspark
!pip install pyarrow
!pip install xlrd
!pip install openpyxl
!pip install chardet



In [None]:
# from pyspark.sql import SparkSession
# PySparkのSparkセッションを使用するために、SparkSession をインポートしています。

# from pyspark.sql.types import StringType, StructType, StructField
# PySparkで使用するデータ型やスキーマを定義するための StringType, StructType, StructField をインポートしています。

# import pyspark.sql.functions as fn
# PySparkで使用する関数をまとめてインポートしています。ここでは fn というエイリアスを使っています。

# from pyspark.sql.functions import col, lit
# データフレームの列へのアクセスや新しい列の追加など、列に関する操作に必要な col と lit をインポートしています。

# import pandas as pd
# Pandasライブラリを pd というエイリアスでインポートしています。Pandasはデータフレームやシリーズを操作するための強力なライブラリです。
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as fn
from pyspark.sql.functions import col, lit
import pandas as pd

In [None]:
# SparkSession.builder: Sparkセッションを構築するためのビルダーオブジェクトを取得します。このビルダーオブジェクトを使用して、セッションの構成を指定します。

# appName('pyspark'): Sparkアプリケーションの名前を指定します。この場合、アプリケーションの名前は 'pyspark' です。この名前は、SparkのウェブUIやログなどで識別に使用されます。

# getOrCreate(): 既存のSparkセッションを取得するか、新しいセッションを作成します。このメソッドは、既にSparkセッションが存在する場合にはそのセッションを返し、存在しない場合には新しいセッションを作成します。
spark = SparkSession.builder.appName('pyspark').getOrCreate()

In [None]:
# このコードは、`chardet` ライブラリを使用して、ファイルの文字エンコーディングを検出する関数を定義し、その関数を使用してファイルの文字エンコーディングを検出します。

# 1. `import chardet`: `chardet` ライブラリをインポートします。このライブラリは、与えられたテキストデータの文字エンコーディングを検出するための機能を提供します。

# 2. `def detect_encoding(file_path):`: `detect_encoding` という名前の関数を定義します。この関数は、ファイルのパスを受け取り、そのファイルの文字エンコーディングを検出します。

# 3. `with open(file_path, 'rb') as file:`: ファイルをバイナリモードで開きます。`'rb'` は読み取り専用でバイナリモードを指定します。

# 4. `result = chardet.detect(file.read(10000))`: ファイルから最初の10,000バイトを読み取り、`chardet.detect` 関数を使用して文字エンコーディングを検出します。検出された結果は `result` に格納されます。

# 5. `return result['encoding']`: 検出された文字エンコーディングを返します。

# 6. `file_name = 'order_history.csv'`: 検出するファイルの名前（またはパス）を指定します。

# 7. `encoding = detect_encoding(file_name)`: 検出された文字エンコーディングを取得します。

# 8. `print(f"Detected encoding: {encoding}")`: 検出された文字エンコーディングを表示します。
import chardet

def detect_encoding(file_path):
    with open(file_path, 'rb') as file:

        result = chardet.detect(file.read(10000))
        return result['encoding']

file_name = 'order_history.csv'  # CSVファイル名(別の階層にある場合はパス)
encoding = detect_encoding(file_name)
print(f"Detected encoding: {encoding}")

Detected encoding: ascii


In [None]:
# このコードは、PySparkを使用してCSVファイルからデータフレームを読み込むためのコードです。各CSVファイルは異なる文字エンコーディングを使用している可能性があるため、それぞれのファイルに対して異なるエンコーディングを指定しています。

# 1. `member_sdf = spark.read.format("csv").option("header", "true").option("encoding", "ascii").load("members.csv")`:
#    - "members.csv" ファイルを読み込んで、データフレーム `member_sdf` を作成します。
#    - `format("csv")` は、読み込むファイルのフォーマットがCSVであることを指定します。
#    - `option("header", "true")` は、CSVファイルの1行目がヘッダーであることを指定します。
#    - `option("encoding", "ascii")` は、CSVファイルのエンコーディングをASCIIとして指定します。

# 2. `product_sdf = spark.read.format("csv").option("header", "true").option("encoding", "utf-8").load("products.csv")`:
#    - "products.csv" ファイルを読み込んで、データフレーム `product_sdf` を作成します。
#    - `option("encoding", "utf-8")` は、CSVファイルのエンコーディングをUTF-8として指定します。

# 3. `order_sdf = spark.read.format("csv").option("header", "true").option("encoding", "ascii").load("order_history.csv")`:
#    - "order_history.csv" ファイルを読み込んで、データフレーム `order_sdf` を作成します。
#    - `option("encoding", "ascii")` は、CSVファイルのエンコーディングをASCIIとして指定します。

# 各ファイルに異なるエンコーディングが使用されている場合、それぞれのファイルに対して適切なエンコーディングを指定することが重要です。これにより、データの正確な読み取りが確保されます。
member_sdf = spark.read.format("csv").option("header", "true").option("encoding", "ascii").load("members.csv")
product_sdf = spark.read.format("csv").option("header", "true").option("encoding", "utf-8").load("products.csv")
order_sdf = spark.read.format("csv").option("header", "true").option("encoding", "ascii").load("order_history.csv")

In [None]:
member_sdf.show()

+----------+---+------+
| member_id|age|gender|
+----------+---+------+
| cograph_1| 58|  Male|
| cograph_2| 44|Female|
| cograph_3| 50|Female|
| cograph_4| 62|Female|
| cograph_5| 59|  Male|
| cograph_6| 30|  Male|
| cograph_7| 50|Female|
| cograph_8| 38|  Male|
| cograph_9| 39|  Male|
|cograph_10| 44|Female|
|cograph_11| 41|  Male|
|cograph_12| 55|  Male|
|cograph_13| 48|  Male|
|cograph_14| 41|  Male|
|cograph_15| 44|Female|
|cograph_16| 43|Female|
|cograph_17| 55|Female|
|cograph_18| 38|Female|
|cograph_19| 43|Female|
|cograph_20| 31|  Male|
+----------+---+------+
only showing top 20 rows



In [None]:
member_sdf.select("member_id").show()

+----------+
| member_id|
+----------+
| cograph_1|
| cograph_2|
| cograph_3|
| cograph_4|
| cograph_5|
| cograph_6|
| cograph_7|
| cograph_8|
| cograph_9|
|cograph_10|
|cograph_11|
|cograph_12|
|cograph_13|
|cograph_14|
|cograph_15|
|cograph_16|
|cograph_17|
|cograph_18|
|cograph_19|
|cograph_20|
+----------+
only showing top 20 rows



In [None]:
member_sdf.select("gender", "member_id").show(5)

+------+---------+
|gender|member_id|
+------+---------+
|  Male|cograph_1|
|Female|cograph_2|
|Female|cograph_3|
|Female|cograph_4|
|  Male|cograph_5|
+------+---------+
only showing top 5 rows



In [None]:
product_sdf.filter(col("price").cast("int") >= 1000).show()

+----------+------------------------+------+
|product_id|            product_name| price|
+----------+------------------------+------+
|         1|      ハンバーグステーキ|2200.0|
|         2|          シーザーサラダ|2100.0|
|         3|        クラムチャウダー|1400.0|
|         4|        マルゲリータピザ|1300.0|
|         5|スパゲッティカルボナーラ|1300.0|
|         6|          ローストチキン|1300.0|
|         7|          ビーフシチュー|1500.0|
|         8|              オムライス|2600.0|
|         9|      シーフードパエリア|2000.0|
|        10|            ラタトゥイユ|1900.0|
+----------+------------------------+------+



In [None]:
member_sdf.filter(
    (col("age").cast("int") >= 20) & (col("gender") == "Female")
).show()

+----------+---+------+
| member_id|age|gender|
+----------+---+------+
| cograph_2| 44|Female|
| cograph_3| 50|Female|
| cograph_4| 62|Female|
| cograph_7| 50|Female|
|cograph_10| 44|Female|
|cograph_15| 44|Female|
|cograph_16| 43|Female|
|cograph_17| 55|Female|
|cograph_18| 38|Female|
|cograph_19| 43|Female|
|cograph_22| 47|Female|
|cograph_24| 33|Female|
|cograph_25| 63|Female|
|cograph_26| 25|Female|
|cograph_27| 40|Female|
|cograph_29| 55|Female|
|cograph_30| 55|Female|
|cograph_31| 42|Female|
|cograph_33| 31|Female|
|cograph_35| 37|Female|
+----------+---+------+
only showing top 20 rows



In [None]:
product_sdf.filter(col("price") <=800).agg(fn.avg("price")).show()

+----------+
|avg(price)|
+----------+
|     574.2|
+----------+



In [None]:
member_sdf.groupBy("gender").agg(fn.count("member_id")).show()

+------+----------------+
|gender|count(member_id)|
+------+----------------+
|Female|              80|
|  Male|             120|
+------+----------------+



In [None]:
order_sdf.groupBy("product_id").agg(
    fn.sum(col("quantity").cast("int")).alias("cnt"),
    ).select("product_id", "cnt").orderBy(col("product_id").cast("int")).show()

+----------+----+
|product_id| cnt|
+----------+----+
|         1| 345|
|         2| 329|
|         3| 334|
|         4| 301|
|         5| 309|
|         6| 154|
|         7| 149|
|         8| 166|
|         9| 182|
|        10| 185|
|        11| 552|
|        12| 534|
|        13| 586|
|        14|1260|
|        15| 505|
+----------+----+

