# データサイエンス100本ノック（構造化データ加工編） - pySpark

## はじめに
- 初めに以下のセルを実行してください
- データはデータサイエンティスト協会が用意したダミーデータです

In [148]:
#データ読み込み
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

df_customer = spark.read.option("inferSchema", "true").option("header","True").csv("./data/customer.csv")
df_category = spark.read.option("inferSchema", "true").option("header","True").csv("./data/category.csv")
df_product = spark.read.option("inferSchema", "true").option("header","True").csv("./data/product.csv")
df_receipt = spark.read.option("inferSchema", "true").option("header","True").csv("./data/receipt.csv")
df_store = spark.read.option("inferSchema", "true").option("header","True").csv("./data/store.csv")
df_geocode = spark.read.option("inferSchema", "true").option("header","True").csv("./data/geocode.csv")

# 演習問題

---
> P-001: レシート明細のデータフレーム（df_receipt）から全項目の先頭10件を表示し、どのようなデータを保有しているか目視で確認せよ。

In [4]:
df_receipt.show(20)

+---------+-----------+--------+----------+--------------+--------------+----------+--------+------+
|sales_ymd|sales_epoch|store_cd|receipt_no|receipt_sub_no|   customer_id|product_cd|quantity|amount|
+---------+-----------+--------+----------+--------------+--------------+----------+--------+------+
| 20181103| 1257206400|  S14006|       112|             1|CS006214000001|P070305012|       1|   158|
| 20181118| 1258502400|  S13008|      1132|             2|CS008415000097|P070701017|       1|    81|
| 20170712| 1215820800|  S14028|      1102|             1|CS028414000014|P060101005|       1|   170|
| 20190205| 1265328000|  S14042|      1132|             1|ZZ000000000000|P050301001|       1|    25|
| 20180821| 1250812800|  S14025|      1102|             2|CS025415000050|P060102007|       1|    90|
| 20190605| 1275696000|  S13003|      1112|             1|CS003515000195|P050102002|       1|   138|
| 20181205| 1259971200|  S14024|      1102|             2|CS024514000042|P080101005|       

---
> P-002: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、10件表示させよ。

In [5]:
df_receipt.select(
    "sales_ymd",
    "customer_id",
    "product_cd",
    "amount"
    ).show(10)

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20181103|CS006214000001|P070305012|   158|
| 20181118|CS008415000097|P070701017|    81|
| 20170712|CS028414000014|P060101005|   170|
| 20190205|ZZ000000000000|P050301001|    25|
| 20180821|CS025415000050|P060102007|    90|
| 20190605|CS003515000195|P050102002|   138|
| 20181205|CS024514000042|P080101005|    30|
| 20190922|CS040415000178|P070501004|   128|
| 20170504|ZZ000000000000|P071302010|   770|
| 20191010|CS027514000015|P071101003|   680|
+---------+--------------+----------+------+
only showing top 10 rows



---
> P-003: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、10件表示させよ。ただし、sales_ymdはsales_dateに項目名を変更しながら抽出すること。

In [6]:
df_receipt.select(
    "sales_ymd",
    "customer_id",
    "product_cd",
    "amount"
    ).withColumnRenamed("sales_ymd","sales_date")\
    .show(10)

+----------+--------------+----------+------+
|sales_date|   customer_id|product_cd|amount|
+----------+--------------+----------+------+
|  20181103|CS006214000001|P070305012|   158|
|  20181118|CS008415000097|P070701017|    81|
|  20170712|CS028414000014|P060101005|   170|
|  20190205|ZZ000000000000|P050301001|    25|
|  20180821|CS025415000050|P060102007|    90|
|  20190605|CS003515000195|P050102002|   138|
|  20181205|CS024514000042|P080101005|    30|
|  20190922|CS040415000178|P070501004|   128|
|  20170504|ZZ000000000000|P071302010|   770|
|  20191010|CS027514000015|P071101003|   680|
+----------+--------------+----------+------+
only showing top 10 rows



---
> P-004: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"

In [7]:
df_receipt.where("customer_id == 'CS018205000001'")\
    .select(
    "sales_ymd",
    "customer_id",
    "product_cd",
    "amount"
    ).show(10)

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20180911|CS018205000001|P071401012|  2200|
| 20180414|CS018205000001|P060104007|   600|
| 20170614|CS018205000001|P050206001|   990|
| 20170614|CS018205000001|P060702015|   108|
| 20190216|CS018205000001|P071005024|   102|
| 20180414|CS018205000001|P071101002|   278|
| 20190226|CS018205000001|P070902035|   168|
| 20190924|CS018205000001|P060805001|   495|
| 20190226|CS018205000001|P071401020|  2200|
| 20180911|CS018205000001|P071401005|  1100|
+---------+--------------+----------+------+
only showing top 10 rows



---
> P-005: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"
> - 売上金額（amount）が1,000以上

In [8]:
df_receipt.where("customer_id == 'CS018205000001' AND amount >= 1000")\
    .select(
    "sales_ymd",
    "customer_id",
    "product_cd",
    "amount"
    ).show(10)

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20180911|CS018205000001|P071401012|  2200|
| 20190226|CS018205000001|P071401020|  2200|
| 20180911|CS018205000001|P071401005|  1100|
+---------+--------------+----------+------+



---
> P-006: レシート明細データフレーム「df_receipt」から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上数量（quantity）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"
> - 売上金額（amount）が1,000以上または売上数量（quantity）が5以上

In [9]:
df_receipt.where("customer_id=='CS018205000001' AND (amount>=1000 OR quantity>=5)")\
    .select(
    "sales_ymd",
    "customer_id",
    "product_cd",
    "quantity",
    "amount"
    ).show(10)

+---------+--------------+----------+--------+------+
|sales_ymd|   customer_id|product_cd|quantity|amount|
+---------+--------------+----------+--------+------+
| 20180911|CS018205000001|P071401012|       1|  2200|
| 20180414|CS018205000001|P060104007|       6|   600|
| 20170614|CS018205000001|P050206001|       5|   990|
| 20190226|CS018205000001|P071401020|       1|  2200|
| 20180911|CS018205000001|P071401005|       1|  1100|
+---------+--------------+----------+--------+------+



---
> P-007: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"
> - 売上金額（amount）が1,000以上2,000以下

In [10]:
df_receipt.where("customer_id=='CS018205000001' AND (1000<=amount AND amount<=2000)")\
    .select(
    "sales_ymd",
    "customer_id",
    "product_cd",
    "amount"
    ).show(10)

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20180911|CS018205000001|P071401005|  1100|
+---------+--------------+----------+------+



---
> P-008: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"
> - 商品コード（product_cd）が"P071401019"以外

In [11]:
df_receipt.where("customer_id=='CS018205000001' AND product_cd!='P071401019'")\
    .select(
    "sales_ymd",
    "customer_id",
    "product_cd",
    "amount"
    ).show(10)

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20180911|CS018205000001|P071401012|  2200|
| 20180414|CS018205000001|P060104007|   600|
| 20170614|CS018205000001|P050206001|   990|
| 20170614|CS018205000001|P060702015|   108|
| 20190216|CS018205000001|P071005024|   102|
| 20180414|CS018205000001|P071101002|   278|
| 20190226|CS018205000001|P070902035|   168|
| 20190924|CS018205000001|P060805001|   495|
| 20190226|CS018205000001|P071401020|  2200|
| 20180911|CS018205000001|P071401005|  1100|
+---------+--------------+----------+------+
only showing top 10 rows



---
> P-009: 以下の処理において、出力結果を変えずにORをANDに書き換えよ。

```
df_store.select(
    "*"
    ).where("NOT(prefecture_cd == '13' OR floor_area > 900)")\
    .show(10)
```

In [12]:
df_store.select(
    "*"
    ).where("prefecture_cd != '13' AND floor_area <= 900")\
    .first()

Row(store_cd='S14046', store_name='北山田店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市都筑区北山田一丁目', address_kana='カナガワケンヨコハマシツヅキクキタヤマタイッチョウメ', tel_no='045-123-4049', longitude=139.5916, latitude=35.56189, floor_area=831.0)

---
> P-010: 店舗データフレーム（df_store）から、店舗コード（store_cd）が"S14"で始まるものだけ全項目抽出し、10件だけ表示せよ。

In [13]:
df_store.select(
    "*"
    ).where("store_cd LIKE 'S14%'")\
    .take(10)

[Row(store_cd='S14010', store_name='菊名店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市港北区菊名一丁目', address_kana='カナガワケンヨコハマシコウホククキクナイッチョウメ', tel_no='045-123-4032', longitude=139.6326, latitude=35.50049, floor_area=1732.0),
 Row(store_cd='S14033', store_name='阿久和店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市瀬谷区阿久和西一丁目', address_kana='カナガワケンヨコハマシセヤクアクワニシイッチョウメ', tel_no='045-123-4043', longitude=139.4961, latitude=35.45918, floor_area=1495.0),
 Row(store_cd='S14036', store_name='相模原中央店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県相模原市中央二丁目', address_kana='カナガワケンサガミハラシチュウオウニチョウメ', tel_no='042-123-4045', longitude=139.3716, latitude=35.57327, floor_area=1679.0),
 Row(store_cd='S14040', store_name='長津田店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市緑区長津田みなみ台五丁目', address_kana='カナガワケンヨコハマシミドリクナガツタミナミダイゴチョウメ', tel_no='045-123-4046', longitude=139.4994, latitude=35.52398, floor_area=1548.0),
 Row(store_cd='S14050', store_name='阿久和西店', prefecture_cd=14, prefectu

---
> P-011: 顧客データフレーム（df_customer）から顧客ID（customer_id）の末尾が1のものだけ全項目抽出し、10件だけ表示せよ。

In [14]:
df_customer.select(
    "*"
    ).where("customer_id LIKE '%1'")\
    .take(10)

[Row(customer_id='CS037613000071', customer_name='六角 雅彦', gender_cd=9, gender='不明', birth_day='1952-04-01', age=66, postal_cd='136-0076', address='東京都江東区南砂**********', application_store_cd='S13037', application_date=20150414, status_cd='0-00000000-0'),
 Row(customer_id='CS028811000001', customer_name='堀井 かおり', gender_cd=1, gender='女性', birth_day='1933-03-27', age=86, postal_cd='245-0016', address='神奈川県横浜市泉区和泉町**********', application_store_cd='S14028', application_date=20160115, status_cd='0-00000000-0'),
 Row(customer_id='CS040412000191', customer_name='川井 郁恵', gender_cd=1, gender='女性', birth_day='1977-01-05', age=42, postal_cd='226-0021', address='神奈川県横浜市緑区北八朔町**********', application_store_cd='S14040', application_date=20151101, status_cd='1-20091025-4'),
 Row(customer_id='CS028314000011', customer_name='小菅 あおい', gender_cd=1, gender='女性', birth_day='1983-11-26', age=35, postal_cd='246-0038', address='神奈川県横浜市瀬谷区宮沢**********', application_store_cd='S14028', application_date=20151123, 

---
> P-012: 店舗データフレーム（df_store）から横浜市の店舗だけ全項目表示せよ。

In [15]:
df_store.select(
    "*"
    ).where("address LIKE '%横浜市%'")\
    .collect()

[Row(store_cd='S14010', store_name='菊名店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市港北区菊名一丁目', address_kana='カナガワケンヨコハマシコウホククキクナイッチョウメ', tel_no='045-123-4032', longitude=139.6326, latitude=35.50049, floor_area=1732.0),
 Row(store_cd='S14033', store_name='阿久和店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市瀬谷区阿久和西一丁目', address_kana='カナガワケンヨコハマシセヤクアクワニシイッチョウメ', tel_no='045-123-4043', longitude=139.4961, latitude=35.45918, floor_area=1495.0),
 Row(store_cd='S14040', store_name='長津田店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市緑区長津田みなみ台五丁目', address_kana='カナガワケンヨコハマシミドリクナガツタミナミダイゴチョウメ', tel_no='045-123-4046', longitude=139.4994, latitude=35.52398, floor_area=1548.0),
 Row(store_cd='S14050', store_name='阿久和西店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市瀬谷区阿久和西一丁目', address_kana='カナガワケンヨコハマシセヤクアクワニシイッチョウメ', tel_no='045-123-4053', longitude=139.4961, latitude=35.45918, floor_area=1830.0),
 Row(store_cd='S14028', store_name='二ツ橋店', prefecture_cd=14, pre

---
> P-013: 顧客データフレーム（df_customer）から、ステータスコード（status_cd）の先頭がアルファベットのA〜Fで始まるデータを全項目抽出し、10件だけ表示せよ。

In [16]:
from pyspark.sql.functions import col
df_customer.select(
    "*"
    ).where(col("status_cd").rlike("^[A-F]"))\
    .take(10)

[Row(customer_id='CS031415000172', customer_name='宇多田 貴美子', gender_cd=1, gender='女性', birth_day='1976-10-04', age=42, postal_cd='151-0053', address='東京都渋谷区代々木**********', application_store_cd='S13031', application_date=20150529, status_cd='D-20100325-C'),
 Row(customer_id='CS015414000103', customer_name='奥野 陽子', gender_cd=1, gender='女性', birth_day='1977-08-09', age=41, postal_cd='136-0073', address='東京都江東区北砂**********', application_store_cd='S13015', application_date=20150722, status_cd='B-20100609-B'),
 Row(customer_id='CS011215000048', customer_name='芦田 沙耶', gender_cd=1, gender='女性', birth_day='1992-02-01', age=27, postal_cd='223-0062', address='神奈川県横浜市港北区日吉本町**********', application_store_cd='S14011', application_date=20150228, status_cd='C-20100421-9'),
 Row(customer_id='CS029415000023', customer_name='梅田 里穂', gender_cd=1, gender='女性', birth_day='1976-01-17', age=43, postal_cd='279-0043', address='千葉県浦安市富士見**********', application_store_cd='S12029', application_date=20150610, statu

---
> P-014: 顧客データフレーム（df_customer）から、ステータスコード（status_cd）の末尾が数字の1〜9で終わるデータを全項目抽出し、10件だけ表示せよ。

In [17]:
df_customer.select(
    "*"
    ).where(col("status_cd").rlike("[1-9]$"))\
    .take(10)

[Row(customer_id='CS001215000145', customer_name='田崎 美紀', gender_cd=1, gender='女性', birth_day='1995-03-29', age=24, postal_cd='144-0055', address='東京都大田区仲六郷**********', application_store_cd='S13001', application_date=20170605, status_cd='6-20090929-2'),
 Row(customer_id='CS033513000180', customer_name='安斎 遥', gender_cd=1, gender='女性', birth_day='1962-07-11', age=56, postal_cd='241-0823', address='神奈川県横浜市旭区善部町**********', application_store_cd='S14033', application_date=20150728, status_cd='6-20080506-5'),
 Row(customer_id='CS011215000048', customer_name='芦田 沙耶', gender_cd=1, gender='女性', birth_day='1992-02-01', age=27, postal_cd='223-0062', address='神奈川県横浜市港北区日吉本町**********', application_store_cd='S14011', application_date=20150228, status_cd='C-20100421-9'),
 Row(customer_id='CS040412000191', customer_name='川井 郁恵', gender_cd=1, gender='女性', birth_day='1977-01-05', age=42, postal_cd='226-0021', address='神奈川県横浜市緑区北八朔町**********', application_store_cd='S14040', application_date=20151101, 

---
> P-015: 顧客データフレーム（df_customer）から、ステータスコード（status_cd）の先頭がアルファベットのA〜Fで始まり、末尾が数字の1〜9で終わるデータを全項目抽出し、10件だけ表示せよ。

In [18]:
df_customer.select(
    "*"
    ).where(col("status_cd").rlike("^[A-F].*[1-9]$"))\
    .take(10)

[Row(customer_id='CS011215000048', customer_name='芦田 沙耶', gender_cd=1, gender='女性', birth_day='1992-02-01', age=27, postal_cd='223-0062', address='神奈川県横浜市港北区日吉本町**********', application_store_cd='S14011', application_date=20150228, status_cd='C-20100421-9'),
 Row(customer_id='CS022513000105', customer_name='島村 貴美子', gender_cd=1, gender='女性', birth_day='1962-03-12', age=57, postal_cd='249-0002', address='神奈川県逗子市山の根**********', application_store_cd='S14022', application_date=20150320, status_cd='A-20091115-7'),
 Row(customer_id='CS001515000096', customer_name='水野 陽子', gender_cd=9, gender='不明', birth_day='1960-11-29', age=58, postal_cd='144-0053', address='東京都大田区蒲田本町**********', application_store_cd='S13001', application_date=20150614, status_cd='A-20100724-7'),
 Row(customer_id='CS013615000053', customer_name='西脇 季衣', gender_cd=1, gender='女性', birth_day='1953-10-18', age=65, postal_cd='261-0026', address='千葉県千葉市美浜区幕張西**********', application_store_cd='S12013', application_date=20150128, 

---
> P-016: 店舗データフレーム（df_store）から、電話番号（tel_no）が3桁-3桁-4桁のデータを全項目表示せよ。

In [19]:
df_store.select(
    "*"
    ).where(col("tel_no").rlike("[0-9]{3}-[0-9]{3}-[0-9]{4}"))\
    .take(10)

[Row(store_cd='S12014', store_name='千草台店', prefecture_cd=12, prefecture='千葉県', address='千葉県千葉市稲毛区千草台一丁目', address_kana='チバケンチバシイナゲクチグサダイイッチョウメ', tel_no='043-123-4003', longitude=140.118, latitude=35.63559, floor_area=1698.0),
 Row(store_cd='S13002', store_name='国分寺店', prefecture_cd=13, prefecture='東京都', address='東京都国分寺市本多二丁目', address_kana='トウキョウトコクブンジシホンダニチョウメ', tel_no='042-123-4008', longitude=139.4802, latitude=35.70566, floor_area=1735.0),
 Row(store_cd='S14010', store_name='菊名店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市港北区菊名一丁目', address_kana='カナガワケンヨコハマシコウホククキクナイッチョウメ', tel_no='045-123-4032', longitude=139.6326, latitude=35.50049, floor_area=1732.0),
 Row(store_cd='S14033', store_name='阿久和店', prefecture_cd=14, prefecture='神奈川県', address='神奈川県横浜市瀬谷区阿久和西一丁目', address_kana='カナガワケンヨコハマシセヤクアクワニシイッチョウメ', tel_no='045-123-4043', longitude=139.4961, latitude=35.45918, floor_area=1495.0),
 Row(store_cd='S14036', store_name='相模原中央店', prefecture_cd=14, prefecture='神奈川県', address

---
> P-17: 顧客データフレーム（df_customer）を生年月日（birth_day）で高齢順にソートし、先頭10件を全項目表示せよ。

In [20]:
df_customer.sort("birth_day").show(10)

+--------------+-------------+---------+------+----------+---+---------+--------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|gender| birth_day|age|postal_cd|                         address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+------+----------+---+---------+--------------------------------+--------------------+----------------+------------+
|CS003813000014|  村山 菜々美|        1|  女性|1928-11-26| 90| 182-0007|    東京都調布市菊野台**********|              S13003|        20160214|0-00000000-0|
|CS026813000004|    吉村 朝陽|        1|  女性|1928-12-14| 90| 251-0043| 神奈川県藤沢市辻堂元町******...|              S14026|        20150723|0-00000000-0|
|CS018811000003|    熊沢 美里|        1|  女性|1929-01-07| 90| 204-0004|      東京都清瀬市野塩**********|              S13018|        20150403|0-00000000-0|
|CS027803000004|    内村 拓郎|        0|  男性|1929-01-12| 90| 251-0031|神奈川県藤沢市鵠沼藤が谷*****...|              S140

---
> P-18: 顧客データフレーム（df_customer）を生年月日（birth_day）で若い順にソートし、先頭10件を全項目表示せよ。

In [21]:
from pyspark.sql.functions import desc
df_customer.sort(desc("birth_day")).show(10)

+--------------+-------------+---------+------+----------+---+---------+---------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|gender| birth_day|age|postal_cd|                          address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+------+----------+---+---------+---------------------------------+--------------------+----------------+------------+
|CS035114000004|    大村 美里|        1|  女性|2007-11-25| 11| 156-0053|       東京都世田谷区桜**********|              S13035|        20150619|6-20091205-6|
|CS022103000002|  福山 はじめ|        9|  不明|2007-10-02| 11| 249-0006|     神奈川県逗子市逗子**********|              S14022|        20160909|0-00000000-0|
|CS002113000009|  柴田 真悠子|        1|  女性|2007-09-17| 11| 184-0014|  東京都小金井市貫井南町******...|              S13002|        20160304|0-00000000-0|
|CS004115000014|    松井 京子|        1|  女性|2007-08-09| 11| 165-0031|     東京都中野区上鷺宮**********|         

---
> P-19: レシート明細データフレーム（df_receipt）に対し、1件あたりの売上金額（amount）が高い順にランクを付与し、先頭10件を抽出せよ。項目は顧客ID（customer_id）、売上金額（amount）、付与したランクを表示させること。なお、売上金額（amount）が等しい場合は同一順位を付与するものとする。

In [22]:
from pyspark.sql import Window
from pyspark.sql.functions import rank
window = Window.partitionBy().orderBy(desc("amount"))
df_receipt.select(
    "customer_id",
    "amount",
    rank().over(window).alias("rank")
    ).show(10)

+--------------+------+----+
|   customer_id|amount|rank|
+--------------+------+----+
|CS011415000006| 10925|   1|
|ZZ000000000000|  6800|   2|
|CS028605000002|  5780|   3|
|CS015515000034|  5480|   4|
|ZZ000000000000|  5480|   4|
|ZZ000000000000|  5480|   4|
|ZZ000000000000|  5440|   7|
|CS021515000089|  5440|   7|
|CS015515000083|  5280|   9|
|CS017414000114|  5280|   9|
+--------------+------+----+
only showing top 10 rows



---
> P-020: レシート明細データフレーム（df_receipt）に対し、1件あたりの売上金額（amount）が高い順にランクを付与し、先頭10件を抽出せよ。項目は顧客ID（customer_id）、売上金額（amount）、付与したランクを表示させること。なお、売上金額（amount）が等しい場合でも別順位を付与すること。

In [23]:
from pyspark.sql.functions import row_number
window = Window.partitionBy().orderBy(desc("amount"))
df_receipt.select(
    "customer_id",
    "amount",
    row_number().over(window).alias("rank")
    ).show(10)

+--------------+------+----+
|   customer_id|amount|rank|
+--------------+------+----+
|CS011415000006| 10925|   1|
|ZZ000000000000|  6800|   2|
|CS028605000002|  5780|   3|
|CS015515000034|  5480|   4|
|ZZ000000000000|  5480|   5|
|ZZ000000000000|  5480|   6|
|ZZ000000000000|  5440|   7|
|CS021515000089|  5440|   8|
|CS015515000083|  5280|   9|
|CS017414000114|  5280|  10|
+--------------+------+----+
only showing top 10 rows



---
> P-021: レシート明細データフレーム（df_receipt）に対し、件数をカウントせよ。

In [24]:
df_receipt.count()

104681

---
> P-022: レシート明細データフレーム（df_receipt）の顧客ID（customer_id）に対し、ユニーク件数をカウントせよ。

In [25]:
df_receipt.select("customer_id").distinct().count()

8307

---
> P-023: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）と売上数量（quantity）を合計せよ。

In [26]:
from pyspark.sql.functions import sum
df_receipt.groupby("store_cd").sum()\
    .select(
        "store_cd",
        "sum(amount)",
        "sum(quantity)"
    ).show(10)

+--------+-----------+-------------+
|store_cd|sum(amount)|sum(quantity)|
+--------+-----------+-------------+
|  S14024|     736323|         2417|
|  S14045|     458484|         1398|
|  S13020|     796383|         2383|
|  S13038|     708884|         2337|
|  S14047|     338329|         1041|
|  S12014|     725167|         2358|
|  S13037|     693087|         2344|
|  S13051|     107452|          354|
|  S12013|     787513|         2425|
|  S14046|     412646|         1354|
+--------+-----------+-------------+
only showing top 10 rows



---
> P-024: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに最も新しい売上日（sales_ymd）を求め、10件表示せよ。

In [27]:
df_receipt.select(
    "customer_id",
    "sales_ymd"
    ).groupby("customer_id")\
    .max()\
    .show(10)

+--------------+--------------+
|   customer_id|max(sales_ymd)|
+--------------+--------------+
|CS024415000195|      20190907|
|CS011515000044|      20181226|
|CS017415000245|      20190615|
|CS011512000113|      20190829|
|CS025414000093|      20191016|
|CS010605000007|      20181009|
|CS011415000097|      20180909|
|CS018515000090|      20190717|
|CS015415000222|      20190928|
|CS038605000003|      20190910|
+--------------+--------------+
only showing top 10 rows



---
> P-025: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに最も古い売上日（sales_ymd）を求め、10件表示せよ。

In [28]:
df_receipt.select(
    "customer_id",
    "sales_ymd"
    ).groupby("customer_id")\
    .min()\
    .show(10)

+--------------+--------------+
|   customer_id|min(sales_ymd)|
+--------------+--------------+
|CS024415000195|      20170119|
|CS011515000044|      20170505|
|CS017415000245|      20180110|
|CS011512000113|      20181213|
|CS025414000093|      20170806|
|CS010605000007|      20170125|
|CS011415000097|      20170108|
|CS018515000090|      20170131|
|CS015415000222|      20170313|
|CS038605000003|      20170321|
+--------------+--------------+
only showing top 10 rows



---
> P-026: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに最も新しい売上日（sales_ymd）と古い売上日を求め、両者が異なるデータを10件表示せよ。

In [29]:
df_receipt.createOrReplaceTempView("receipt")
spark.sql("""
SELECT 
    customer_id, 
    MAX(sales_ymd), 
    MIN(sales_ymd)
FROM 
    receipt
GROUP BY 
    customer_id
HAVING 
    MAX(sales_ymd) != MIN(sales_ymd)
    LIMIT 10
""").show(10)

+--------------+--------------+--------------+
|   customer_id|max(sales_ymd)|min(sales_ymd)|
+--------------+--------------+--------------+
|CS024415000195|      20190907|      20170119|
|CS011515000044|      20181226|      20170505|
|CS017415000245|      20190615|      20180110|
|CS011512000113|      20190829|      20181213|
|CS025414000093|      20191016|      20170806|
|CS010605000007|      20181009|      20170125|
|CS011415000097|      20180909|      20170108|
|CS018515000090|      20190717|      20170131|
|CS015415000222|      20190928|      20170313|
|CS038605000003|      20190910|      20170321|
+--------------+--------------+--------------+



---
> P-027: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の平均を計算し、降順でTOP5を表示せよ。

In [30]:
from pyspark.sql.functions import avg
df_receipt.select(
    "store_cd",
    "amount"
    ).groupby("store_cd")\
    .avg()\
    .sort(desc("avg(amount)"))\
    .show(5)

+--------+------------------+
|store_cd|       avg(amount)|
+--------+------------------+
|  S13052|402.86746987951807|
|  S13015|351.11196043165467|
|  S13003| 350.9155188246097|
|  S14010|348.79126213592235|
|  S13001| 348.4703862660944|
+--------+------------------+
only showing top 5 rows



---
> P-028: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の中央値を計算し、降順でTOP5を表示せよ。

In [31]:
import pyspark.sql.functions as F
median = F.expr('percentile_approx(amount, 0.5)')
df_receipt.select(
    "store_cd",
    "amount"
    ).groupby("store_cd")\
    .agg(median.alias("median"))\
    .sort(desc("median"))\
    .show(5)

+--------+------+
|store_cd|median|
+--------+------+
|  S13052|   190|
|  S14010|   188|
|  S14050|   185|
|  S13018|   180|
|  S14040|   180|
+--------+------+
only showing top 5 rows



---
> P-029: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに商品コード（product_cd）の最頻値を求めよ。

In [32]:
# あまりよい答えではないと思います。。。
from pyspark.sql.functions import row_number, col
from pyspark.sql.window import Window

w = Window().partitionBy("store_cd").orderBy(col("count").desc())

df_receipt.groupby("store_cd","product_cd")\
    .count()\
    .withColumn("rn", row_number().over(w))\
    .where(col("rn") == 1)\
    .select("store_cd", "product_cd", "count")\
    .show(10)

+--------+----------+-----+
|store_cd|product_cd|count|
+--------+----------+-----+
|  S14024|P060303001|   96|
|  S14045|P060303001|   33|
|  S13020|P071401001|   79|
|  S13038|P060303001|   41|
|  S14047|P060303001|   36|
|  S12014|P060303001|   65|
|  S13037|P060303001|   88|
|  S13051|P080804001|    5|
|  S12013|P060303001|  107|
|  S14046|P060303001|   71|
+--------+----------+-----+
only showing top 10 rows



---
> P-030: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の標本分散を計算し、降順でTOP5を表示せよ。

In [33]:
from pyspark.sql.functions import var_samp
df_receipt.select(
    "store_cd",
    "amount"
    ).groupby("store_cd")\
    .agg(var_samp(col("amount")).alias("variance"))\
    .sort(desc("variance"))\
    .show(10)

+--------+------------------+
|store_cd|          variance|
+--------+------------------+
|  S13052|441863.25252623414|
|  S14011|306442.24243156885|
|  S14034|297068.39274006087|
|  S13001| 295558.8426177121|
|  S13015|295427.19708585314|
|  S13035| 278121.4683233888|
|  S14045| 266642.7122792716|
|  S14046|264460.63579109305|
|  S14010|258568.36391478623|
|  S14021|254123.83834690147|
+--------+------------------+
only showing top 10 rows



---
> P-031: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の標本標準偏差を計算し、降順でTOP5を表示せよ。

In [34]:
from pyspark.sql.functions import stddev_samp
df_receipt.select(
    "store_cd",
    "amount"
    ).groupby("store_cd")\
    .agg(stddev_samp(col("amount")).alias("std"))\
    .sort(desc("std"))\
    .show(10)

+--------+------------------+
|store_cd|               std|
+--------+------------------+
|  S13052| 664.7279537722437|
|  S14011| 553.5722558361906|
|  S14034| 545.0398083994057|
|  S13001| 543.6532374756101|
|  S13015| 543.5321490821432|
|  S13035| 527.3722293820455|
|  S14045| 516.3745852375692|
|  S14046| 514.2573633805287|
|  S14010| 508.4961788595724|
|  S14021|504.10697113499776|
+--------+------------------+
only showing top 10 rows



---
> P-032: レシート明細データフレーム（df_receipt）の売上金額（amount）について、25％刻みでパーセンタイル値を求めよ。

In [35]:
df_receipt.stat.approxQuantile("amount", [0.25,0.5,0.75], 0) 

[102.0, 170.0, 288.0]

---
> P-033: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の平均を計算し、330以上のものを抽出せよ。

In [36]:
df_receipt.select(
    "store_cd",
    "amount"
    ).groupby("store_cd")\
    .avg()\
    .where("avg(amount) >= 330")\
    .show(5)

+--------+------------------+
|store_cd|       avg(amount)|
+--------+------------------+
|  S14045| 330.0820734341253|
|  S13020|  337.879932117098|
|  S14047| 330.0770731707317|
|  S12013|330.19412997903567|
|  S14011| 335.7183333333333|
+--------+------------------+
only showing top 5 rows



---
> P-034: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに売上金額（amount）を合計して全顧客の平均を求めよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。


In [37]:
df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .groupby()\
    .avg()\
    .show()

+-----------------+
| avg(sum(amount))|
+-----------------+
|2547.742234529256|
+-----------------+



---
> P-035: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに売上金額（amount）を合計して全顧客の平均を求め、平均以上に買い物をしている顧客を抽出せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。なお、データは10件だけ表示させれば良い。

In [38]:
avg = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .groupby()\
    .avg()\
    .first()[0]
    
df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .where(col("sum(amount)") >= avg)\
    .show(10)

+--------------+-----------+
|   customer_id|sum(amount)|
+--------------+-----------+
|CS024415000195|       4638|
|CS025414000093|       3929|
|CS010605000007|       6420|
|CS011415000097|       3462|
|CS018515000090|       3756|
|CS015415000222|      11472|
|CS038605000003|       2833|
|CS049513000008|       4450|
|CS017514000012|       4699|
|CS020515000105|       5283|
+--------------+-----------+
only showing top 10 rows



---
> P-036: レシート明細データフレーム（df_receipt）と店舗データフレーム（df_store）を内部結合し、レシート明細データフレームの全項目と店舗データフレームの店舗名（store_name）を10件表示させよ。

In [39]:
df_receipt.join(df_store.select("store_cd","store_name"), "store_cd", "inner")\
    .show(10)

+--------+---------+-----------+----------+--------------+--------------+----------+--------+------+----------+
|store_cd|sales_ymd|sales_epoch|receipt_no|receipt_sub_no|   customer_id|product_cd|quantity|amount|store_name|
+--------+---------+-----------+----------+--------------+--------------+----------+--------+------+----------+
|  S14006| 20181103| 1257206400|       112|             1|CS006214000001|P070305012|       1|   158|  葛が谷店|
|  S13008| 20181118| 1258502400|      1132|             2|CS008415000097|P070701017|       1|    81|    成城店|
|  S14028| 20170712| 1215820800|      1102|             1|CS028414000014|P060101005|       1|   170|  二ツ橋店|
|  S14042| 20190205| 1265328000|      1132|             1|ZZ000000000000|P050301001|       1|    25|  新山下店|
|  S14025| 20180821| 1250812800|      1102|             2|CS025415000050|P060102007|       1|    90|    大和店|
|  S13003| 20190605| 1275696000|      1112|             1|CS003515000195|P050102002|       1|   138|    狛江店|
|  S14024| 20

---
> P-037: 商品データフレーム（df_product）とカテゴリデータフレーム（df_category）を内部結合し、商品データフレームの全項目とカテゴリデータフレームの小区分名（category_small_name）を10件表示させよ。

In [40]:
df_product.join(df_category.select("category_small_cd","category_small_name"), "category_small_cd", "inner")\
    .show(10)

+-----------------+----------+-----------------+------------------+----------+---------+-------------------+
|category_small_cd|product_cd|category_major_cd|category_medium_cd|unit_price|unit_cost|category_small_name|
+-----------------+----------+-----------------+------------------+----------+---------+-------------------+
|            40101|P040101001|                4|               401|       198|      149|             弁当類|
|            40101|P040101002|                4|               401|       218|      164|             弁当類|
|            40101|P040101003|                4|               401|       230|      173|             弁当類|
|            40101|P040101004|                4|               401|       248|      186|             弁当類|
|            40101|P040101005|                4|               401|       268|      201|             弁当類|
|            40101|P040101006|                4|               401|       298|      224|             弁当類|
|            40101|P040101007|       

---
> P-038: 顧客データフレーム（df_customer）とレシート明細データフレーム（df_receipt）から、各顧客ごとの売上金額合計を求めよ。ただし、買い物の実績がない顧客については売上金額を0として表示させること。また、顧客は性別コード（gender_cd）が女性（1）であるものを対象とし、非会員（顧客IDが'Z'から始まるもの）は除外すること。なお、結果は10件だけ表示させれば良い。

In [41]:
df_amount = df_receipt.select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()

df_customer.where((col("gender_cd")==1) & (~col("customer_id").rlike("^Z")))\
    .join(df_amount, "customer_id", "left")\
    .na.fill(0)\
    .select("customer_id","sum(amount)")\
    .show(10)

+--------------+-----------+
|   customer_id|sum(amount)|
+--------------+-----------+
|CS021313000114|          0|
|CS031415000172|       5088|
|CS028811000001|          0|
|CS001215000145|        875|
|CS015414000103|       3122|
|CS033513000180|        868|
|CS035614000014|          0|
|CS011215000048|       3444|
|CS009413000079|          0|
|CS040412000191|        210|
+--------------+-----------+
only showing top 10 rows



---
> P-039: レシート明細データフレーム（df_receipt）から売上日数の多い顧客の上位20件と、売上金額合計の多い顧客の上位20件を抽出し、完全外部結合せよ。ただし、非会員（顧客IDが'Z'から始まるもの）は除外すること。

In [42]:
df_freq = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select("customer_id","sales_ymd")\
    .groupby("customer_id")\
    .count()\
    .sort(desc("count"))\
    .limit(20)

df_sum = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select("customer_id","amount")\
    .groupby("customer_id")\
    .sum()\
    .sort(desc("sum(amount)"))\
    .limit(20)

df_freq.join(df_sum, "customer_id", "outer")\
    .show()

+--------------+-----+-----------+
|   customer_id|count|sum(amount)|
+--------------+-----+-----------+
|CS001605000009| null|      18925|
|CS006515000023| null|      18372|
|CS007514000094| null|      15735|
|CS007515000107|   40|       null|
|CS009414000059| null|      15492|
|CS010214000002|   42|       null|
|CS010214000010|   44|      18585|
|CS011414000106| null|      18338|
|CS011415000006| null|      16094|
|CS014214000023|   38|       null|
|CS014415000077|   38|       null|
|CS015415000185|   44|      20153|
|CS015515000034| null|      15300|
|CS016415000101| null|      16348|
|CS016415000141|   40|      18372|
|CS017415000097|   40|      23086|
|CS021514000008|   36|       null|
|CS021514000045|   40|       null|
|CS021515000089| null|      17580|
|CS021515000172|   38|       null|
+--------------+-----+-----------+
only showing top 20 rows



---
> P-040: 全ての店舗と全ての商品を組み合わせると何件のデータとなるか調査したい。店舗（df_store）と商品（df_product）を直積した件数を計算せよ。

In [43]:
df_store.crossJoin(df_product).count()

531590

---
> P-041: レシート明細データフレーム（df_receipt）の売上金額（amount）を日付（sales_ymd）ごとに集計し、前日からの売上金額増減を計算せよ。なお、計算結果は10件表示すればよい。

In [44]:
from pyspark.sql.functions import lag
window = Window.orderBy("sales_ymd")  
df_receipt.select("sales_ymd","amount")\
    .groupby("sales_ymd")\
    .sum()\
    .withColumn("lag amount", lag("sum(amount)").over(window))\
    .show(10)

+---------+--------------+-----------+----------+
|sales_ymd|sum(sales_ymd)|sum(amount)|lag amount|
+---------+--------------+-----------+----------+
| 20170101|    1795138989|      33723|      null|
| 20170102|    1734628772|      24165|     33723|
| 20170103|    1815309270|      27503|     24165|
| 20170104|    2057350608|      36165|     27503|
| 20170105|    1694288820|      37830|     36165|
| 20170106|    1593438374|      32387|     37830|
| 20170107|    1734629202|      23415|     32387|
| 20170108|    1916160260|      24737|     23415|
| 20170109|    1956500573|      26718|     24737|
| 20170110|    1432077810|      20143|     26718|
+---------+--------------+-----------+----------+
only showing top 10 rows



---
> P-042: レシート明細データフレーム（df_receipt）の売上金額（amount）を日付（sales_ymd）ごとに集計し、各日付のデータに対し、１日前、２日前、３日前のデータを結合せよ。結果は10件表示すればよい。

In [45]:
from pyspark.sql.functions import lag
window = Window.orderBy("sales_ymd")  
df_receipt.select("sales_ymd","amount")\
    .groupby("sales_ymd")\
    .sum()\
    .select(
    "*",
    lag("sum(amount)").over(window).alias("1 days ago"),
    lag("sum(amount)",2).over(window).alias("2 days ago"),
    lag("sum(amount)",3).over(window).alias("3 days ago")
    ).show(10)

+---------+--------------+-----------+----------+----------+----------+
|sales_ymd|sum(sales_ymd)|sum(amount)|1 days ago|2 days ago|3 days ago|
+---------+--------------+-----------+----------+----------+----------+
| 20170101|    1795138989|      33723|      null|      null|      null|
| 20170102|    1734628772|      24165|     33723|      null|      null|
| 20170103|    1815309270|      27503|     24165|     33723|      null|
| 20170104|    2057350608|      36165|     27503|     24165|     33723|
| 20170105|    1694288820|      37830|     36165|     27503|     24165|
| 20170106|    1593438374|      32387|     37830|     36165|     27503|
| 20170107|    1734629202|      23415|     32387|     37830|     36165|
| 20170108|    1916160260|      24737|     23415|     32387|     37830|
| 20170109|    1956500573|      26718|     24737|     23415|     32387|
| 20170110|    1432077810|      20143|     26718|     24737|     23415|
+---------+--------------+-----------+----------+----------+----

---
> P-043： レシート明細データフレーム（df_receipt）と顧客データフレーム（df_customer）を結合し、性別（gender）と年代（ageから計算）ごとに売上金額（amount）を合計した売上サマリデータフレーム（df_sales_summary）を作成せよ。性別は0が男性、1が女性、9が不明を表すものとする。
>
> ただし、項目構成は年代、女性の売上金額、男性の売上金額、性別不明の売上金額の4項目とすること（縦に年代、横に性別のクロス集計）。また、年代は10歳ごとの階級とすること。

In [46]:
import math
from pyspark.sql.functions import udf

def calc_era(x):
    return math.floor(x/10)*10
calc_era_udf = udf(calc_era)

df_sales_summary = df_receipt.join(df_customer, "customer_id", "inner")\
    .withColumn("era", calc_era_udf(col("age")))\
    .select("era","gender", "amount")\
    .groupby("era")\
    .pivot("gender")\
    .agg(sum("amount"))\
    .sort("era")\

df_sales_summary.show()

+---+------+-------+------+
|era|  不明|   女性|  男性|
+---+------+-------+------+
| 10|  4317| 149836|  1591|
| 20| 44328|1363724| 72940|
| 30| 50441| 693047|177322|
| 40|483512|9320791| 19355|
| 50|342923|6685192| 54320|
| 60| 71418| 987741|272469|
| 70|  2427|  29764| 13435|
| 80|  5111| 262923| 46360|
| 90|  null|   6260|  null|
+---+------+-------+------+



---
> P-044： 前設問で作成した売上サマリデータフレーム（df_sales_summary）は性別の売上を横持ちさせたものであった。このデータフレームから性別を縦持ちさせ、年代、性別コード、売上金額の3項目に変換せよ。ただし、性別コードは男性を'00'、女性を'01'、不明を'99'とする。

In [47]:
#これもあまり明解ではない
df_tmp = df_sales_summary.withColumnRenamed("男性","m")\
    .withColumnRenamed("女性","f")\
    .withColumnRenamed("不明","n")\
    
column = df_tmp.columns
df_tmp.selectExpr(
    "era",
    f"""
    stack(
            {len(column)-1},
            {', '.join(f'"{c}", {c}' for c in column[1:])}
        ) 
    """
    ).withColumnRenamed("col0","gender")\
    .withColumnRenamed("col1","amount")\
    .replace({"m":"00","f":"01","n":"99"})\
    .show(30)

+---+------+-------+
|era|gender| amount|
+---+------+-------+
| 10|    99|   4317|
| 10|    01| 149836|
| 10|    00|   1591|
| 20|    99|  44328|
| 20|    01|1363724|
| 20|    00|  72940|
| 30|    99|  50441|
| 30|    01| 693047|
| 30|    00| 177322|
| 40|    99| 483512|
| 40|    01|9320791|
| 40|    00|  19355|
| 50|    99| 342923|
| 50|    01|6685192|
| 50|    00|  54320|
| 60|    99|  71418|
| 60|    01| 987741|
| 60|    00| 272469|
| 70|    99|   2427|
| 70|    01|  29764|
| 70|    00|  13435|
| 80|    99|   5111|
| 80|    01| 262923|
| 80|    00|  46360|
| 90|    99|   null|
| 90|    01|   6260|
| 90|    00|   null|
+---+------+-------+



参考: https://qiita.com/calderarie/items/9acd3d77e23484bd3bad

---
> P-045: 顧客データフレーム（df_customer）の生年月日（birth_day）は日付型（Date）でデータを保有している。これをYYYYMMDD形式の文字列に変換し、顧客ID（customer_id）とともに抽出せよ。データは10件を抽出すれば良い。

In [48]:
#文字列の置換になるので、問題の趣旨からは外れる
from pyspark.sql.functions import regexp_replace
df_customer.select(
    "customer_id",
    regexp_replace("birth_day", "(\d{4})-(\d{2})-(\d{2})", "$1$2$3").alias("birth_day")
    ).show(10)

+--------------+---------+
|   customer_id|birth_day|
+--------------+---------+
|CS021313000114| 19810429|
|CS037613000071| 19520401|
|CS031415000172| 19761004|
|CS028811000001| 19330327|
|CS001215000145| 19950329|
|CS020401000016| 19740915|
|CS015414000103| 19770809|
|CS029403000008| 19730817|
|CS015804000004| 19310502|
|CS033513000180| 19620711|
+--------------+---------+
only showing top 10 rows



---
> P-046: 顧客データフレーム（df_customer）の申し込み日（application_date）はYYYYMMD形式の数値型でデータを保有している。これを日付型（dateやdatetime）に変換し、顧客ID（customer_id）とともに抽出せよ。データは10件を抽出すれば良い。

In [49]:
from pyspark.sql.functions import to_date
from pyspark.sql.types import StringType
df_customer.select(
    "customer_id",
    to_date(col("application_date").cast(StringType()),"yyyyMMdd").alias("application_date")
    ).show(10)

+--------------+----------------+
|   customer_id|application_date|
+--------------+----------------+
|CS021313000114|      2015-09-05|
|CS037613000071|      2015-04-14|
|CS031415000172|      2015-05-29|
|CS028811000001|      2016-01-15|
|CS001215000145|      2017-06-05|
|CS020401000016|      2015-02-25|
|CS015414000103|      2015-07-22|
|CS029403000008|      2015-05-15|
|CS015804000004|      2015-06-07|
|CS033513000180|      2015-07-28|
+--------------+----------------+
only showing top 10 rows



format: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html

---
> P-047: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）はYYYYMMDD形式の数値型でデータを保有している。これを日付型（dateやdatetime）に変換し、レシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。データは10件を抽出すれば良い。

In [50]:
df_receipt.select(
    "customer_id",
    to_date(col("sales_ymd").cast(StringType()),"yyyyMMdd").alias("application_date")
    ).show(10)

+--------------+----------------+
|   customer_id|application_date|
+--------------+----------------+
|CS006214000001|      2018-11-03|
|CS008415000097|      2018-11-18|
|CS028414000014|      2017-07-12|
|ZZ000000000000|      2019-02-05|
|CS025415000050|      2018-08-21|
|CS003515000195|      2019-06-05|
|CS024514000042|      2018-12-05|
|CS040415000178|      2019-09-22|
|ZZ000000000000|      2017-05-04|
|CS027514000015|      2019-10-10|
+--------------+----------------+
only showing top 10 rows



---
> P-048: レシート明細データフレーム（df_receipt）の売上エポック秒（sales_epoch）は数値型のUNIX秒でデータを保有している。これを日付型（dateやdatetime）に変換し、レシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。データは10件を抽出すれば良い。

In [51]:
from pyspark.sql.functions import from_unixtime
df_receipt.select(
    "receipt_no",
    "receipt_sub_no",
    from_unixtime("sales_epoch").alias("sales_date")
    ).show(10)

+----------+--------------+-------------------+
|receipt_no|receipt_sub_no|         sales_date|
+----------+--------------+-------------------+
|       112|             1|2009-11-03 00:00:00|
|      1132|             2|2009-11-18 00:00:00|
|      1102|             1|2008-07-12 00:00:00|
|      1132|             1|2010-02-05 00:00:00|
|      1102|             2|2009-08-21 00:00:00|
|      1112|             1|2010-06-05 00:00:00|
|      1102|             2|2009-12-05 00:00:00|
|      1102|             1|2010-09-22 00:00:00|
|      1112|             2|2008-05-04 00:00:00|
|      1102|             1|2010-10-10 00:00:00|
+----------+--------------+-------------------+
only showing top 10 rows



---
> P-049: レシート明細データフレーム（df_receipt）の売上エポック秒（sales_epoch）を日付型（timestamp型）に変換し、"年"だけ取り出してレシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。データは10件を抽出すれば良い。

In [52]:
from pyspark.sql.functions import year
df_receipt.select(
    "receipt_no",
    "receipt_sub_no",
    year(from_unixtime("sales_epoch")).alias("sales_year")
    ).show(10)

+----------+--------------+----------+
|receipt_no|receipt_sub_no|sales_year|
+----------+--------------+----------+
|       112|             1|      2009|
|      1132|             2|      2009|
|      1102|             1|      2008|
|      1132|             1|      2010|
|      1102|             2|      2009|
|      1112|             1|      2010|
|      1102|             2|      2009|
|      1102|             1|      2010|
|      1112|             2|      2008|
|      1102|             1|      2010|
+----------+--------------+----------+
only showing top 10 rows



---
> P-050: レシート明細データフレーム（df_receipt）の売上エポック秒（sales_epoch）を日付型（timestamp型）に変換し、"月"だけ取り出してレシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。なお、"月"は0埋め2桁で取り出すこと。データは10件を抽出すれば良い。

In [53]:
from pyspark.sql.functions import month, format_string
df_receipt.select(
    "receipt_no",
    "receipt_sub_no",
    format_string("%02d",month(from_unixtime("sales_epoch"))).alias("sales_month")
    ).show(10)

+----------+--------------+-----------+
|receipt_no|receipt_sub_no|sales_month|
+----------+--------------+-----------+
|       112|             1|         11|
|      1132|             2|         11|
|      1102|             1|         07|
|      1132|             1|         02|
|      1102|             2|         08|
|      1112|             1|         06|
|      1102|             2|         12|
|      1102|             1|         09|
|      1112|             2|         05|
|      1102|             1|         10|
+----------+--------------+-----------+
only showing top 10 rows



---
> P-051: レシート明細データフレーム（df_receipt）の売上エポック秒（sales_epoch）を日付型（timestamp型）に変換し、"日"だけ取り出してレシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。なお、"日"は0埋め2桁で取り出すこと。データは10件を抽出すれば良い。

In [54]:
from pyspark.sql.functions import dayofmonth
df_receipt.select(
    "receipt_no",
    "receipt_sub_no",
    format_string("%02d",dayofmonth(from_unixtime("sales_epoch"))).alias("sales_day")
    ).show(10)

+----------+--------------+---------+
|receipt_no|receipt_sub_no|sales_day|
+----------+--------------+---------+
|       112|             1|       03|
|      1132|             2|       18|
|      1102|             1|       12|
|      1132|             1|       05|
|      1102|             2|       21|
|      1112|             1|       05|
|      1102|             2|       05|
|      1102|             1|       22|
|      1112|             2|       04|
|      1102|             1|       10|
+----------+--------------+---------+
only showing top 10 rows



---
> P-052: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計の上、売上金額合計に対して2000円以下を0、2000円超を1に2値化し、顧客ID、売上金額合計とともに10件表示せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。

In [55]:
def binalization(x):
    if(x>2000):
        return 1
    else:
        return 0
bi = udf(binalization)

df_receipt.where(~col("customer_id").rlike("^Z"))\
    .groupby("customer_id")\
    .sum()\
    .select(
    "customer_id",
    "sum(amount)",
    bi(col("sum(amount)")).alias("does_purchase_over2000")
    ).show(10)

+--------------+-----------+----------------------+
|   customer_id|sum(amount)|does_purchase_over2000|
+--------------+-----------+----------------------+
|CS024415000195|       4638|                     1|
|CS011515000044|       2533|                     1|
|CS017415000245|       1289|                     0|
|CS011512000113|        686|                     0|
|CS025414000093|       3929|                     1|
|CS010605000007|       6420|                     1|
|CS011415000097|       3462|                     1|
|CS018515000090|       3756|                     1|
|CS015415000222|      11472|                     1|
|CS038605000003|       2833|                     1|
+--------------+-----------+----------------------+
only showing top 10 rows



---
> P-053: 顧客データフレーム（df_customer）の郵便番号（postal_cd）に対し、東京（先頭3桁が100〜209のもの）を1、それ以外のものを0に２値化せよ。さらにレシート明細データフレーム（df_receipt）と結合し、全期間において買い物実績のある顧客数を、作成した2値ごとにカウントせよ。

In [56]:
def binalization(x):
    if(100 <= int(x[0:3]) <= 209):
        return 1
    else:
        return 0
bi = udf(binalization)

df_customer.select(
    "postal_cd",
    bi(col("postal_cd")).alias("is_locate_tokyo")
    ).show(10)

+---------+---------------+
|postal_cd|is_locate_tokyo|
+---------+---------------+
| 259-1113|              0|
| 136-0076|              1|
| 151-0053|              1|
| 245-0016|              0|
| 144-0055|              1|
| 174-0065|              1|
| 136-0073|              1|
| 279-0003|              0|
| 136-0073|              1|
| 241-0823|              0|
+---------+---------------+
only showing top 10 rows



---
> P-054: 顧客データデータフレーム（df_customer）の住所（address）は、埼玉県、千葉県、東京都、神奈川県のいずれかとなっている。都道府県毎にコード値を作成し、顧客ID、住所とともに抽出せよ。値は埼玉県を11、千葉県を12、東京都を13、神奈川県を14とすること。結果は10件表示させれば良い。

In [57]:
def generate_code(x):
    if("埼玉県" in x):
        return "11"
    if("千葉県" in x):
        return "12"
    if("東京都" in x):
        return "13"
    if("神奈川県" in x):
        return "14"
gen = udf(generate_code)

df_customer.select(
    "customer_id",
    "address",
    gen(col("address")).alias("prefecture_cd")
    ).show(10)

+--------------+--------------------------------+-------------+
|   customer_id|                         address|prefecture_cd|
+--------------+--------------------------------+-------------+
|CS021313000114|  神奈川県伊勢原市粟窪**********|           14|
|CS037613000071|      東京都江東区南砂**********|           13|
|CS031415000172|    東京都渋谷区代々木**********|           13|
|CS028811000001|神奈川県横浜市泉区和泉町*****...|           14|
|CS001215000145|    東京都大田区仲六郷**********|           13|
|CS020401000016|      東京都板橋区若木**********|           13|
|CS015414000103|      東京都江東区北砂**********|           13|
|CS029403000008|      千葉県浦安市海楽**********|           12|
|CS015804000004|      東京都江東区北砂**********|           13|
|CS033513000180|神奈川県横浜市旭区善部町*****...|           14|
+--------------+--------------------------------+-------------+
only showing top 10 rows



---
> P-055: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、その合計金額の四分位点を求めよ。その上で、顧客ごとの売上金額合計に対して以下の基準でカテゴリ値を作成し、顧客ID、売上金額と合計ともに表示せよ。カテゴリ値は上から順に1〜4とする。結果は10件表示させれば良い。
>
> - 最小値以上第一四分位未満
> - 第一四分位以上第二四分位未満
> - 第二四分位以上第三四分位未満
> - 第三四分位以上

In [58]:
percentile = df_receipt.stat.approxQuantile("amount", [0.25,0.5,0.75], 0)
def generate_percentile(x):
    if x < percentile[0]:
        return 1
    elif percentile[0] <= x < percentile[1]:
        return 2
    elif percentile[1] <= x < percentile[2]:
        return 3
    elif percentile[2] <= x:
        return 4
    
gen = udf(generate_percentile)

df_receipt.select(
    "customer_id",
    "amount",
    gen(col("amount")).alias("percentile")
    ).show(10)

+--------------+------+----------+
|   customer_id|amount|percentile|
+--------------+------+----------+
|CS006214000001|   158|         2|
|CS008415000097|    81|         1|
|CS028414000014|   170|         3|
|ZZ000000000000|    25|         1|
|CS025415000050|    90|         1|
|CS003515000195|   138|         2|
|CS024514000042|    30|         1|
|CS040415000178|   128|         2|
|ZZ000000000000|   770|         4|
|CS027514000015|   680|         4|
+--------------+------+----------+
only showing top 10 rows



---
> P-056: 顧客データフレーム（df_customer）の年齢（age）をもとに10歳刻みで年代を算出し、顧客ID（customer_id）、生年月日（birth_day）とともに抽出せよ。ただし、60歳以上は全て60歳代とすること。年代を表すカテゴリ名は任意とする。先頭10件を表示させればよい。

In [59]:
def calc_era(x):
    if(x>=60):
        return 60
    else:
        return math.floor(x/10)*10
calc_era_udf = udf(calc_era)

df_customer.select(
    "customer_id",
    "birth_day",
    "age",
    calc_era_udf(col("age")).alias("era")
    ).show(10)

+--------------+----------+---+---+
|   customer_id| birth_day|age|era|
+--------------+----------+---+---+
|CS021313000114|1981-04-29| 37| 30|
|CS037613000071|1952-04-01| 66| 60|
|CS031415000172|1976-10-04| 42| 40|
|CS028811000001|1933-03-27| 86| 60|
|CS001215000145|1995-03-29| 24| 20|
|CS020401000016|1974-09-15| 44| 40|
|CS015414000103|1977-08-09| 41| 40|
|CS029403000008|1973-08-17| 45| 40|
|CS015804000004|1931-05-02| 87| 60|
|CS033513000180|1962-07-11| 56| 50|
+--------------+----------+---+---+
only showing top 10 rows



---
> P-057: 前問題の抽出結果と性別（gender）を組み合わせ、新たに性別×年代の組み合わせを表すカテゴリデータを作成せよ。組み合わせを表すカテゴリの値は任意とする。先頭10件を表示させればよい。

In [60]:
def generate_row(x,y):
    return f"{x},{y}"
gen = udf(generate_row)

df_customer.select(
    "customer_id",
    "birth_day",
    "age",
    "gender",
    calc_era_udf(col("age")).alias("era"),
    gen(col("gender"),calc_era_udf(col("age"))).alias("gender+era")
    ).show(10)

+--------------+----------+---+------+---+----------+
|   customer_id| birth_day|age|gender|era|gender+era|
+--------------+----------+---+------+---+----------+
|CS021313000114|1981-04-29| 37|  女性| 30|   女性,30|
|CS037613000071|1952-04-01| 66|  不明| 60|   不明,60|
|CS031415000172|1976-10-04| 42|  女性| 40|   女性,40|
|CS028811000001|1933-03-27| 86|  女性| 60|   女性,60|
|CS001215000145|1995-03-29| 24|  女性| 20|   女性,20|
|CS020401000016|1974-09-15| 44|  男性| 40|   男性,40|
|CS015414000103|1977-08-09| 41|  女性| 40|   女性,40|
|CS029403000008|1973-08-17| 45|  男性| 40|   男性,40|
|CS015804000004|1931-05-02| 87|  男性| 60|   男性,60|
|CS033513000180|1962-07-11| 56|  女性| 50|   女性,50|
+--------------+----------+---+------+---+----------+
only showing top 10 rows



---
> P-058: 顧客データフレーム（df_customer）の性別コード（gender_cd）をダミー変数化し、顧客ID（customer_id）とともに抽出せよ。結果は10件表示させれば良い。

In [61]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.functions import vector_to_array

encoder = OneHotEncoder(inputCol="gender_cd",
                        outputCol="genderVec")
model = encoder.fit(df_customer)
encoded = model.transform(df_customer)

encoded.withColumn("gender_dummy", vector_to_array("genderVec"))\
    .select(
    "customer_id",
    "gender",
    col("gender_dummy")[0],
    col("gender_dummy")[1]
    ).show(10)

+--------------+------+---------------+---------------+
|   customer_id|gender|gender_dummy[0]|gender_dummy[1]|
+--------------+------+---------------+---------------+
|CS021313000114|  女性|            0.0|            1.0|
|CS037613000071|  不明|            0.0|            0.0|
|CS031415000172|  女性|            0.0|            1.0|
|CS028811000001|  女性|            0.0|            1.0|
|CS001215000145|  女性|            0.0|            1.0|
|CS020401000016|  男性|            1.0|            0.0|
|CS015414000103|  女性|            0.0|            1.0|
|CS029403000008|  男性|            1.0|            0.0|
|CS015804000004|  男性|            1.0|            0.0|
|CS033513000180|  女性|            0.0|            1.0|
+--------------+------+---------------+---------------+
only showing top 10 rows



---
> P-059: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、合計した売上金額を平均0、標準偏差1に標準化して顧客ID、売上金額合計とともに表示せよ。標準化に使用する標準偏差は、不偏標準偏差と標本標準偏差のどちらでも良いものとする。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。結果は10件表示させれば良い。

In [62]:
from pyspark.sql.functions import stddev

avg = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .groupby()\
    .avg()\
    .first()[0]

std = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .groupby()\
    .agg(stddev("sum(amount)"))\
    .first()[0]

df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .withColumn("sum(amount)_scaled",(col("sum(amount)")-avg)/std)\
    .show(10)

+--------------+-----------+--------------------+
|   customer_id|sum(amount)|  sum(amount)_scaled|
+--------------+-----------+--------------------+
|CS024415000195|       4638|  0.7682867318900658|
|CS011515000044|       2533|-0.00541859639243...|
|CS017415000245|       1289| -0.4626582298287443|
|CS011512000113|        686| -0.6842944830137078|
|CS025414000093|       3929|   0.507689545309404|
|CS010605000007|       6420|   1.423271480108316|
|CS011415000097|       3462| 0.33604090478307247|
|CS018515000090|       3756|  0.4441023615598208|
|CS015415000222|      11472|   3.280164267986318|
|CS038605000003|       2833|     0.1048481962369|
+--------------+-----------+--------------------+
only showing top 10 rows



---
> P-060: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、合計した売上金額を最小値0、最大値1に正規化して顧客ID、売上金額合計とともに表示せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。結果は10件表示させれば良い。

In [63]:
max_amount = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .groupby()\
    .max()\
    .first()[0]

min_amount = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .groupby()\
    .min()\
    .first()[0]

df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .withColumn("sum(amount)_scaled",(col("sum(amount)")-min_amount)/(max_amount-min_amount))\
    .show(10)

+--------------+-----------+--------------------+
|   customer_id|sum(amount)|  sum(amount)_scaled|
+--------------+-----------+--------------------+
|CS024415000195|       4638| 0.19847062912756344|
|CS011515000044|       2533| 0.10701251303441084|
|CS017415000245|       1289|0.052963156065345844|
|CS011512000113|        686|  0.0267639902676399|
|CS025414000093|       3929|  0.1676659714980883|
|CS010605000007|       6420|  0.2758950295446646|
|CS011415000097|       3462| 0.14737573861661452|
|CS018515000090|       3756| 0.16014946124435175|
|CS015415000222|      11472|  0.4953945081682308|
|CS038605000003|       2833| 0.12004692387904067|
+--------------+-----------+--------------------+
only showing top 10 rows



---
> P-061: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、合計した売上金額を常用対数化（底=10）して顧客ID、売上金額合計とともに表示せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。結果は10件表示させれば良い。

In [64]:
from pyspark.sql.functions import log10

df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .withColumn("sum(amount)_log10",log10(col("sum(amount)")))\
    .show(10)

+--------------+-----------+------------------+
|   customer_id|sum(amount)| sum(amount)_log10|
+--------------+-----------+------------------+
|CS024415000195|       4638|3.6663307443019684|
|CS011515000044|       2533| 3.403635189790548|
|CS017415000245|       1289| 3.110252917353403|
|CS011512000113|        686|2.8363241157067516|
|CS025414000093|       3929| 3.594282028811806|
|CS010605000007|       6420| 3.807535028068853|
|CS011415000097|       3462| 3.539327063539375|
|CS018515000090|       3756|3.5747255835940734|
|CS015415000222|      11472| 4.059639138323725|
|CS038605000003|       2833| 3.452246574520437|
+--------------+-----------+------------------+
only showing top 10 rows



---
> P-062: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、合計した売上金額を自然対数化(底=e）して顧客ID、売上金額合計とともに表示せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。結果は10件表示させれば良い。

In [65]:
from pyspark.sql.functions import log

df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .withColumn("sum(amount)_log",log(col("sum(amount)")))\
    .show(10)

+--------------+-----------+-----------------+
|   customer_id|sum(amount)|  sum(amount)_log|
+--------------+-----------+-----------------+
|CS024415000195|       4638|8.442038517815478|
|CS011515000044|       2533|7.837159650001675|
|CS017415000245|       1289|7.161622002939187|
|CS011512000113|        686|6.530877627725885|
|CS025414000093|       3929| 8.27614021955846|
|CS010605000007|       6420|8.767173396684006|
|CS011415000097|       3462|8.149601735736155|
|CS018515000090|       3756|8.231109840328154|
|CS015415000222|      11472|9.347664562839402|
|CS038605000003|       2833|7.949091499830517|
+--------------+-----------+-----------------+
only showing top 10 rows



---
> P-063: 商品データフレーム（df_product）の単価（unit_price）と原価（unit_cost）から、各商品の利益額を算出せよ。結果は10件表示させれば良い。

In [66]:
df_product.select(
    "product_cd",
    "unit_price",
    "unit_cost",
    (col("unit_price")-col("unit_cost")).alias("profit")
    ).show(10)

+----------+----------+---------+------+
|product_cd|unit_price|unit_cost|profit|
+----------+----------+---------+------+
|P040101001|       198|      149|    49|
|P040101002|       218|      164|    54|
|P040101003|       230|      173|    57|
|P040101004|       248|      186|    62|
|P040101005|       268|      201|    67|
|P040101006|       298|      224|    74|
|P040101007|       338|      254|    84|
|P040101008|       420|      315|   105|
|P040101009|       498|      374|   124|
|P040101010|       580|      435|   145|
+----------+----------+---------+------+
only showing top 10 rows



---
> P-064: 商品データフレーム（df_product）の単価（unit_price）と原価（unit_cost）から、各商品の利益率の全体平均を算出せよ。
ただし、単価と原価にはNULLが存在することに注意せよ。

In [67]:
df_product.select(
    "product_cd",
    "unit_price",
    "unit_cost"
    ).na.drop("any", subset=["unit_price","unit_cost"])\
    .withColumn("profit_rate", (col("unit_price")-col("unit_cost"))/col("unit_price"))\
    .groupby()\
    .avg()\
    .show()

+------------------+-----------------+-------------------+
|   avg(unit_price)|   avg(unit_cost)|   avg(profit_rate)|
+------------------+-----------------+-------------------+
|402.57557617479796|302.1894642322658|0.24911389885176904|
+------------------+-----------------+-------------------+



---
> P-065: 商品データフレーム（df_product）の各商品について、利益率が30%となる新たな単価を求めよ。ただし、1円未満は切り捨てること。そして結果を10件表示させ、利益率がおよそ30％付近であることを確認せよ。ただし、単価（unit_price）と原価（unit_cost）にはNULLが存在することに注意せよ。

In [68]:
import numpy as np

def my_floor(x):
    return int(np.floor(x)) #udfでnumpy型の返り値はサポートされていない

floor_udf = udf(my_floor)
df_product.select(
    "product_cd",
    "unit_price",
    "unit_cost",
    floor_udf(col("unit_cost")/0.7).alias("valid_price")
).show(10)

+----------+----------+---------+-----------+
|product_cd|unit_price|unit_cost|valid_price|
+----------+----------+---------+-----------+
|P040101001|       198|      149|        212|
|P040101002|       218|      164|        234|
|P040101003|       230|      173|        247|
|P040101004|       248|      186|        265|
|P040101005|       268|      201|        287|
|P040101006|       298|      224|        320|
|P040101007|       338|      254|        362|
|P040101008|       420|      315|        450|
|P040101009|       498|      374|        534|
|P040101010|       580|      435|        621|
+----------+----------+---------+-----------+
only showing top 10 rows



---
> P-066: 商品データフレーム（df_product）の各商品について、利益率が30%となる新たな単価を求めよ。今回は、1円未満を四捨五入すること（0.5については偶数方向の丸めで良い）。そして結果を10件表示させ、利益率がおよそ30％付近であることを確認せよ。ただし、単価（unit_price）と原価（unit_cost）にはNULLが存在することに注意せよ。

In [69]:
def my_round(x):
    return int(np.round(x))

round_udf = udf(my_round)
df_product.select(
    "product_cd",
    "unit_price",
    "unit_cost",
    round_udf(col("unit_cost")/0.7).alias("valid_price")
).show(10)

+----------+----------+---------+-----------+
|product_cd|unit_price|unit_cost|valid_price|
+----------+----------+---------+-----------+
|P040101001|       198|      149|        213|
|P040101002|       218|      164|        234|
|P040101003|       230|      173|        247|
|P040101004|       248|      186|        266|
|P040101005|       268|      201|        287|
|P040101006|       298|      224|        320|
|P040101007|       338|      254|        363|
|P040101008|       420|      315|        450|
|P040101009|       498|      374|        534|
|P040101010|       580|      435|        621|
+----------+----------+---------+-----------+
only showing top 10 rows



---
> P-067: 商品データフレーム（df_product）の各商品について、利益率が30%となる新たな単価を求めよ。今回は、1円未満を切り上げること。そして結果を10件表示させ、利益率がおよそ30％付近であることを確認せよ。ただし、単価（unit_price）と原価（unit_cost）にはNULLが存在することに注意せよ。

In [70]:
def my_ceil(x):
    return int(np.ceil(x))

ceil_udf = udf(my_ceil)
df_product.select(
    "product_cd",
    "unit_price",
    "unit_cost",
    ceil_udf(col("unit_cost")/0.7).alias("valid_price")
).show(10)

+----------+----------+---------+-----------+
|product_cd|unit_price|unit_cost|valid_price|
+----------+----------+---------+-----------+
|P040101001|       198|      149|        213|
|P040101002|       218|      164|        235|
|P040101003|       230|      173|        248|
|P040101004|       248|      186|        266|
|P040101005|       268|      201|        288|
|P040101006|       298|      224|        320|
|P040101007|       338|      254|        363|
|P040101008|       420|      315|        451|
|P040101009|       498|      374|        535|
|P040101010|       580|      435|        622|
+----------+----------+---------+-----------+
only showing top 10 rows



---
> P-068: 商品データフレーム（df_product）の各商品について、消費税率10%の税込み金額を求めよ。 1円未満の端数は切り捨てとし、結果は10件表示すれば良い。ただし、単価（unit_price）にはNULLが存在することに注意せよ。

In [71]:
def my_floor(x):
    return int(np.floor(x))

floor_udf = udf(my_floor)
df_product.select(
    "product_cd",
    "unit_price",
    "unit_cost",
    floor_udf(col("unit_cost")*1.1).alias("price_add_tax")
).show(10)

+----------+----------+---------+-------------+
|product_cd|unit_price|unit_cost|price_add_tax|
+----------+----------+---------+-------------+
|P040101001|       198|      149|          163|
|P040101002|       218|      164|          180|
|P040101003|       230|      173|          190|
|P040101004|       248|      186|          204|
|P040101005|       268|      201|          221|
|P040101006|       298|      224|          246|
|P040101007|       338|      254|          279|
|P040101008|       420|      315|          346|
|P040101009|       498|      374|          411|
|P040101010|       580|      435|          478|
+----------+----------+---------+-------------+
only showing top 10 rows



---
> P-069: レシート明細データフレーム（df_receipt）と商品データフレーム（df_product）を結合し、顧客毎に全商品の売上金額合計と、カテゴリ大区分（category_major_cd）が"07"（瓶詰缶詰）の売上金額合計を計算の上、両者の比率を求めよ。抽出対象はカテゴリ大区分"07"（瓶詰缶詰）の購入実績がある顧客のみとし、結果は10件表示させればよい。

In [72]:
df_all = df_receipt.join(df_product, "product_cd", "inner")\
    .groupby("customer_id")\
    .sum()\
    .select(
        "customer_id",
        col("sum(amount)").alias("amount_all")
    )

df_07 = df_receipt.join(df_product.where("category_major_cd == '07'"), "product_cd", "inner")\
    .groupby("customer_id")\
    .sum()\
    .select(
        "customer_id",
        col("sum(amount)").alias("amount_07")
    )

df_all.join(df_07, "customer_id", "inner")\
    .withColumn("ratio_07", col("amount_07")/col("amount_all"))\
    .show(10)

+--------------+----------+---------+-------------------+
|   customer_id|amount_all|amount_07|           ratio_07|
+--------------+----------+---------+-------------------+
|CS001415000301|       188|      188|                1.0|
|CS002411000018|      1054|      326| 0.3092979127134725|
|CS010605000007|      6420|     3946| 0.6146417445482866|
|CS011415000097|      3462|     1436|  0.414789139225881|
|CS011515000044|      2533|     1064|0.42005527043031976|
|CS015415000222|     11472|     9314| 0.8118898186889819|
|CS017415000245|      1289|      607| 0.4709076803723817|
|CS017514000012|      4699|      196| 0.0417110023409236|
|CS017514000103|       348|       50|0.14367816091954022|
|CS018515000090|      3756|     1594|0.42438764643237487|
+--------------+----------+---------+-------------------+
only showing top 10 rows



---
> P-070: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、顧客データフレーム（df_customer）の会員申込日（application_date）からの経過日数を計算し、顧客ID（customer_id）、売上日、会員申込日とともに表示せよ。結果は10件表示させれば良い。

In [73]:
from pyspark.sql.functions import datediff
df_receipt.join(df_customer, "customer_id", "inner")\
    .select("customer_id","sales_ymd","application_date")\
    .distinct()\
    .select(
        "customer_id",
        to_date(col("sales_ymd").cast(StringType()),"yyyyMMdd").alias("sales_ymd"),
        to_date(col("application_date").cast(StringType()), "yyyyMMdd").alias("application_date")
    ).withColumn("elapsed_date",datediff(col("sales_ymd"),col("application_date")))\
    .show(10)

+--------------+----------+----------------+------------+
|   customer_id| sales_ymd|application_date|elapsed_date|
+--------------+----------+----------------+------------+
|CS019514000062|2018-12-30|      2015-02-18|        1411|
|CS035515000170|2019-05-09|      2015-04-01|        1499|
|CS034412000216|2018-06-21|      2016-05-16|         766|
|CS024314000023|2019-09-17|      2015-05-20|        1581|
|CS032515000080|2018-11-15|      2015-09-26|        1146|
|CS026314000055|2017-02-22|      2015-10-11|         500|
|CS015414000100|2017-06-06|      2015-05-06|         762|
|CS008415000059|2017-10-12|      2015-10-01|         742|
|CS034513000219|2019-01-11|      2015-05-20|        1332|
|CS024414000101|2019-01-14|      2015-09-07|        1225|
+--------------+----------+----------------+------------+
only showing top 10 rows



---
> P-071: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、顧客データフレーム（df_customer）の会員申込日（application_date）からの経過月数を計算し、顧客ID（customer_id）、売上日、会員申込日とともに表示せよ。結果は10件表示させれば良い。1ヶ月未満は切り捨てること。

In [74]:
from pyspark.sql.functions import months_between
from pyspark.sql.types import IntegerType

df_receipt.join(df_customer, "customer_id", "inner")\
    .select("customer_id","sales_ymd","application_date")\
    .distinct()\
    .select(
        "customer_id",
        to_date(col("sales_ymd").cast(StringType()),"yyyyMMdd").alias("sales_ymd"),
        to_date(col("application_date").cast(StringType()), "yyyyMMdd").alias("application_date")
    ).withColumn("elapsed_month", months_between(col("sales_ymd"),col("application_date")).cast(IntegerType()))\
    .show(10)

+--------------+----------+----------------+-------------+
|   customer_id| sales_ymd|application_date|elapsed_month|
+--------------+----------+----------------+-------------+
|CS019514000062|2018-12-30|      2015-02-18|           46|
|CS035515000170|2019-05-09|      2015-04-01|           49|
|CS034412000216|2018-06-21|      2016-05-16|           25|
|CS024314000023|2019-09-17|      2015-05-20|           51|
|CS032515000080|2018-11-15|      2015-09-26|           37|
|CS026314000055|2017-02-22|      2015-10-11|           16|
|CS015414000100|2017-06-06|      2015-05-06|           25|
|CS008415000059|2017-10-12|      2015-10-01|           24|
|CS034513000219|2019-01-11|      2015-05-20|           43|
|CS024414000101|2019-01-14|      2015-09-07|           40|
+--------------+----------+----------------+-------------+
only showing top 10 rows



---
> P-072: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、顧客データフレーム（df_customer）の会員申込日（application_date）からの経過年数を計算し、顧客ID（customer_id）、売上日、会員申込日とともに表示せよ。結果は10件表示させれば良い。1年未満は切り捨てること。

In [75]:
from dateutil.relativedelta import relativedelta

def diff_year(x,y):
    return relativedelta(x,y).years
diff_year_udf = udf(diff_year)

df_receipt.join(df_customer, "customer_id", "inner")\
    .select("customer_id","sales_ymd","application_date")\
    .distinct()\
    .select(
        "customer_id",
        to_date(col("sales_ymd").cast(StringType()),"yyyyMMdd").alias("sales_ymd"),
        to_date(col("application_date").cast(StringType()), "yyyyMMdd").alias("application_date")
    ).withColumn("elapsed_year", diff_year_udf(col("sales_ymd"),col("application_date")))\
    .show(10)

+--------------+----------+----------------+------------+
|   customer_id| sales_ymd|application_date|elapsed_year|
+--------------+----------+----------------+------------+
|CS019514000062|2018-12-30|      2015-02-18|           3|
|CS035515000170|2019-05-09|      2015-04-01|           4|
|CS034412000216|2018-06-21|      2016-05-16|           2|
|CS024314000023|2019-09-17|      2015-05-20|           4|
|CS032515000080|2018-11-15|      2015-09-26|           3|
|CS026314000055|2017-02-22|      2015-10-11|           1|
|CS015414000100|2017-06-06|      2015-05-06|           2|
|CS008415000059|2017-10-12|      2015-10-01|           2|
|CS034513000219|2019-01-11|      2015-05-20|           3|
|CS024414000101|2019-01-14|      2015-09-07|           3|
+--------------+----------+----------------+------------+
only showing top 10 rows



---
> P-073: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、顧客データフレーム（df_customer）の会員申込日（application_date）からのエポック秒による経過時間を計算し、顧客ID（customer_id）、売上日、会員申込日とともに表示せよ。結果は10件表示させれば良い（なお、sales_ymdは数値、application_dateは文字列でデータを保持している点に注意）。なお、時間情報は保有していないため各日付は0時0分0秒を表すものとする。

In [76]:
from pyspark.sql.functions import unix_timestamp

df_receipt.join(df_customer, "customer_id", "inner")\
    .select("customer_id","sales_ymd","application_date")\
    .distinct()\
    .select(
        "customer_id",
        to_date(col("sales_ymd").cast(StringType()),"yyyyMMdd").alias("sales_ymd"),
        to_date(col("application_date").cast(StringType()), "yyyyMMdd").alias("application_date")
    ).withColumn("elapsed_seconds", unix_timestamp("sales_ymd") - unix_timestamp("application_date"))\
    .show(10)

+--------------+----------+----------------+---------------+
|   customer_id| sales_ymd|application_date|elapsed_seconds|
+--------------+----------+----------------+---------------+
|CS019514000062|2018-12-30|      2015-02-18|      121910400|
|CS035515000170|2019-05-09|      2015-04-01|      129513600|
|CS034412000216|2018-06-21|      2016-05-16|       66182400|
|CS024314000023|2019-09-17|      2015-05-20|      136598400|
|CS032515000080|2018-11-15|      2015-09-26|       99014400|
|CS026314000055|2017-02-22|      2015-10-11|       43200000|
|CS015414000100|2017-06-06|      2015-05-06|       65836800|
|CS008415000059|2017-10-12|      2015-10-01|       64108800|
|CS034513000219|2019-01-11|      2015-05-20|      115084800|
|CS024414000101|2019-01-14|      2015-09-07|      105840000|
+--------------+----------+----------------+---------------+
only showing top 10 rows



---
> P-074: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、当該週の月曜日からの経過日数を計算し、売上日、当該週の月曜日付とともに表示せよ。結果は10件表示させれば良い（なお、sales_ymdは数値でデータを保持している点に注意）。

In [77]:
from pyspark.sql.functions import dayofweek, next_day, date_sub

df_receipt.join(df_customer, "customer_id", "inner")\
    .select("customer_id","sales_ymd","application_date")\
    .distinct()\
    .select(
        "customer_id",
        to_date(col("sales_ymd").cast(StringType()),"yyyyMMdd").alias("sales_ymd"),
    ).withColumn("monday", date_sub(next_day(col("sales_ymd"),"Mon"), 7))\
    .withColumn("dayofweek", (dayofweek("sales_ymd")+5)%7)\
    .show(10)

+--------------+----------+----------+---------+
|   customer_id| sales_ymd|    monday|dayofweek|
+--------------+----------+----------+---------+
|CS019514000062|2018-12-30|2018-12-24|        6|
|CS035515000170|2019-05-09|2019-05-06|        3|
|CS034412000216|2018-06-21|2018-06-18|        3|
|CS024314000023|2019-09-17|2019-09-16|        1|
|CS032515000080|2018-11-15|2018-11-12|        3|
|CS026314000055|2017-02-22|2017-02-20|        2|
|CS015414000100|2017-06-06|2017-06-05|        1|
|CS008415000059|2017-10-12|2017-10-09|        3|
|CS034513000219|2019-01-11|2019-01-07|        4|
|CS024414000101|2019-01-14|2019-01-14|        0|
+--------------+----------+----------+---------+
only showing top 10 rows



---
> P-075: 顧客データフレーム（df_customer）からランダムに1%のデータを抽出し、先頭から10件データを抽出せよ。

In [78]:
df_customer.sample(True, 0.01).show(10)

+--------------+-------------+---------+------+----------+---+---------+---------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|gender| birth_day|age|postal_cd|                          address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+------+----------+---+---------+---------------------------------+--------------------+----------------+------------+
|CS022513000105|  島村 貴美子|        1|  女性|1962-03-12| 57| 249-0002|   神奈川県逗子市山の根**********|              S14022|        20150320|A-20091115-7|
|CS028415000057|      山内 瞬|        1|  女性|1975-08-26| 43| 246-0035|神奈川県横浜市瀬谷区下瀬谷****...|              S14028|        20150518|F-20100908-F|
|CS004215000068|  田辺 みゆき|        1|  女性|1992-07-29| 26| 167-0022|     東京都杉並区下井草**********|              S13004|        20160502|1-20080723-1|
|CS015415000209|    大谷 倫子|        1|  女性|1970-11-25| 48| 136-0076|       東京都江東区南砂**********|           

In [79]:
print(df_customer.count())
print(df_customer.sample(False, 0.01).count())

21971
242


---
> P-076: 顧客データフレーム（df_customer）から性別（gender_cd）の割合に基づきランダムに10%のデータを層化抽出データし、性別ごとに件数を集計せよ。

In [80]:
df_customer.groupby("gender_cd")\
    .count()\
    .withColumn("rario", col("count")/df_customer.count())\
    .show()

+---------+-----+-------------------+
|gender_cd|count|              rario|
+---------+-----+-------------------+
|        1|17918| 0.8155295616949615|
|        9| 1072|0.04879158891265759|
|        0| 2981|0.13567884939238087|
+---------+-----+-------------------+



In [270]:
columns = df_customer.columns
fraction = {1:0.1, 9:0.1, 0:0.1}

sample = df_customer.sampleBy("gender_cd",fraction)
    
sample.groupby("gender_cd")\
    .count()\
    .show()

+---------+-----+
|gender_cd|count|
+---------+-----+
|        1| 1828|
|        9|   93|
|        0|  319|
+---------+-----+



---
> P-077: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客単位に合計し、合計した売上金額の外れ値を抽出せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。なお、ここでは外れ値を平均から3σ以上離れたものとする。結果は10件表示させれば良い。

In [82]:
avg = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .groupby()\
    .avg()\
    .first()[0]

std = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .groupby()\
    .agg(stddev("sum(amount)"))\
    .first()[0]

df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .withColumn("sum(amount)_scaled",(col("sum(amount)")-avg)/std)\
    .where("abs(`sum(amount)_scaled`) >= 3")\
    .show(10)

+--------------+-----------+------------------+
|   customer_id|sum(amount)|sum(amount)_scaled|
+--------------+-----------+------------------+
|CS015415000222|      11472| 3.280164267986318|
|CS038415000071|      12696|3.7300527819140052|
|CS026515000201|      11144|3.1596059080449117|
|CS006515000023|      18372| 5.816300498461024|
|CS013415000176|      12383|3.6150077616040655|
|CS010415000121|      11761| 3.386387944885911|
|CS015515000034|      15300| 4.687168541936633|
|CS032414000072|      16563| 5.151391738906134|
|CS008414000053|      12276| 3.575679272232936|
|CS009415000214|      13742| 4.114516332214953|
+--------------+-----------+------------------+
only showing top 10 rows



---
> P-078: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客単位に合計し、合計した売上金額の外れ値を抽出せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。なお、ここでは外れ値を第一四分位と第三四分位の差であるIQRを用いて、「第一四分位数-1.5×IQR」よりも下回るもの、または「第三四分位数+1.5×IQR」を超えるものとする。結果は10件表示させれば良い。

In [83]:
percentile = df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .stat.approxQuantile("sum(amount)", [0.25,0.5,0.75], 0)
iqr = percentile[2] - percentile[0]
lower_bound = percentile[0] - iqr*1.5
upper_bound = percentile[2] + iqr*1.5

df_receipt.where(~col("customer_id").rlike("^Z"))\
    .select(
    "customer_id",
    "amount"
    ).groupby("customer_id")\
    .sum()\
    .where((col("sum(amount)") < lower_bound) | (upper_bound < col("sum(amount)")))\
    .show(10)

+--------------+-----------+
|   customer_id|sum(amount)|
+--------------+-----------+
|CS015415000222|      11472|
|CS038415000071|      12696|
|CS021514000045|       9741|
|CS026515000201|      11144|
|CS006515000023|      18372|
|CS013214000003|       8589|
|CS034415000196|       8656|
|CS023414000057|      10075|
|CS031414000048|      10538|
|CS018205000001|       8739|
+--------------+-----------+
only showing top 10 rows



---
> P-079: 商品データフレーム（df_product）の各項目に対し、欠損数を確認せよ。

In [84]:
for c in df_product.columns:
    print(f"{c}:{df_product.where(df_product[c].isNull()).count()}")

product_cd:0
category_major_cd:0
category_medium_cd:0
category_small_cd:0
unit_price:7
unit_cost:7


---
> P-080: 商品データフレーム（df_product）のいずれかの項目に欠損が発生しているレコードを全て削除した新たなdf_product_1を作成せよ。なお、削除前後の件数を表示させ、前設問で確認した件数だけ減少していることも確認すること。

In [85]:
print(df_product.count())
print(df_product.na.drop("any").count())

10030
10023


---
> P-081: 単価（unit_price）と原価（unit_cost）の欠損値について、それぞれの平均値で補完した新たなdf_product_2を作成せよ。なお、平均値について1円未満は四捨五入とし、0.5については偶数寄せでかまわない。補完実施後、各項目について欠損が生じていないことも確認すること。

In [94]:
mean = df_product.select("unit_price", "unit_cost")\
    .groupby()\
    .avg()\
    .first()

fill_values = {"unit_price":int(np.round(mean[0])), 
               "unit_cost":int(np.round(mean[1]))}

filled = df_product.na.fill(fill_values)

for c in filled.columns:
    print(f"{c}:{filled.where(filled[c].isNull()).count()}")

product_cd:0
category_major_cd:0
category_medium_cd:0
category_small_cd:0
unit_price:0
unit_cost:0


---
> P-082: 単価（unit_price）と原価（unit_cost）の欠損値について、それぞれの中央値で補完した新たなdf_product_3を作成せよ。なお、中央値について1円未満は四捨五入とし、0.5については偶数寄せでかまわない。補完実施後、各項目について欠損が生じていないことも確認すること。

In [101]:
fill_values = {"unit_price":int(np.round(df_product.stat.approxQuantile("unit_price", [0.5], 0)[0])), 
               "unit_cost":int(np.round(df_product.stat.approxQuantile("unit_cost", [0.5], 0)[0]))}

filled = df_product.na.fill(fill_values)

for c in filled.columns:
    print(f"{c}:{filled.where(filled[c].isNull()).count()}")

product_cd:0
category_major_cd:0
category_medium_cd:0
category_small_cd:0
unit_price:0
unit_cost:0


---
> P-083: 単価（unit_price）と原価（unit_cost）の欠損値について、各商品の小区分（category_small_cd）ごとに算出した中央値で補完した新たなdf_product_4を作成せよ。なお、中央値について1円未満は四捨五入とし、0.5については偶数寄せでかまわない。補完実施後、各項目について欠損が生じていないことも確認すること。

In [110]:
import pyspark.sql.functions as F
from pyspark.sql.functions import when
median_price = F.expr('percentile_approx(unit_price, 0.5)')
median_cost = F.expr('percentile_approx(unit_cost, 0.5)')
window = Window.partitionBy(df_product.category_small_cd)

filled = df_product.withColumn("median_price", median_price.over(window))\
    .withColumn("median_cost", median_price.over(window))\
    .withColumn("filled_price", when(col("unit_price").isNull(), 
                                     col("median_price")).otherwise(col("unit_price")))\
    .withColumn("filled_cost", when(col("unit_cost").isNull(), 
                                     col("median_cost")).otherwise(col("unit_price")))\

for c in filled.columns:
    print(f"{c}:{filled.where(filled[c].isNull()).count()}")

product_cd:0
category_major_cd:0
category_medium_cd:0
category_small_cd:0
unit_price:7
unit_cost:7
median_price:0
median_cost:0
filled_price:0
filled_cost:0


---
> P-084: 顧客データフレーム（df_customer）の全顧客に対し、全期間の売上金額に占める2019年売上金額の割合を計算せよ。ただし、販売実績のない場合は0として扱うこと。そして計算した割合が0超のものを抽出せよ。 結果は10件表示させれば良い。また、作成したデータにNAやNANが存在しないことを確認せよ。

In [125]:
df_sales_2019 = df_receipt.where("(20190101 <= sales_ymd) AND (sales_ymd <= 20191231)")\
    .groupby("customer_id")\
    .sum()\
    .select(
    "customer_id",
    col("sum(amount)").alias("amount_2019")
    )

df_sales_all = df_customer.join(df_receipt, "customer_id", "left")\
    .groupby("customer_id")\
    .sum()\
    .select(
    "customer_id",
    col("sum(amount)").alias("amount")
    )

rate = df_sales_all.join(df_sales_2019, "customer_id", "left")\
    .select(
    "*",
    (col("amount_2019")/col("amount")).alias("amount_rate")
    ).na.fill(0)

rate.where("amount_rate > 0").show(10)

+--------------+------+-----------+--------------------+
|   customer_id|amount|amount_2019|         amount_rate|
+--------------+------+-----------+--------------------+
|CS049513000008|  4450|       3320|  0.7460674157303371|
|CS024415000195|  4638|        894|  0.1927554980595084|
|CS017514000012|  4699|        273|0.058097467546286446|
|CS020515000105|  5283|       1302| 0.24645088018171493|
|CS017415000245|  1289|        970|   0.752521334367727|
|CS038605000003|  2833|       1878|  0.6629015178256266|
|CS015415000222| 11472|       4486|  0.3910390516039052|
|CS025414000093|  3929|       1595|  0.4059557139221176|
|CS002513000664|   398|        398|                 1.0|
|CS034515000045|  1105|        416|  0.3764705882352941|
+--------------+------+-----------+--------------------+
only showing top 10 rows



In [126]:
for c in rate.columns:
    print(f"{c}:{rate.where(rate[c].isNull()).count()}")

customer_id:0
amount:0
amount_2019:0
amount_rate:0


---
> P-085: 顧客データフレーム（df_customer）の全顧客に対し、郵便番号（postal_cd）を用いて経度緯度変換用データフレーム（df_geocode）を紐付け、新たなdf_customer_1を作成せよ。ただし、複数紐づく場合は経度（longitude）、緯度（latitude）それぞれ平均を算出すること。


In [153]:
df_tmp = df_customer.join(df_geocode.select("postal_cd", "longitude", "latitude"), "postal_cd", "inner")\
    .groupby("customer_id")\
    .avg()\
    .select(
    "customer_id",
    col("avg(longitude)").alias("longitude"),
    col("avg(latitude)").alias("latitude")
    )

df_customer_1 = df_customer.join(df_tmp, "customer_id", "inner")
df_customer_1.show(5)

+--------------+-------------+---------+------+----------+---+---------+----------------------------+--------------------+----------------+------------+---------+--------+
|   customer_id|customer_name|gender_cd|gender| birth_day|age|postal_cd|                     address|application_store_cd|application_date|   status_cd|longitude|latitude|
+--------------+-------------+---------+------+----------+---+---------+----------------------------+--------------------+----------------+------------+---------+--------+
|CS030612000044|    矢沢 夏希|        1|  女性|1955-04-10| 63| 273-0033|千葉県船橋市本郷町**********|              S12030|        20141022|0-00000000-0|139.95209|35.70791|
|CS030412000172|    戸田 美紀|        1|  女性|1973-04-26| 45| 273-0033|千葉県船橋市本郷町**********|              S12030|        20160319|5-20091018-6|139.95209|35.70791|
|CS007503000041|  綾小路 高史|        0|  男性|1964-04-20| 54| 285-0854|  千葉県佐倉市上座**********|              S12007|        20160214|0-00000000-0|140.16283|35.72408|
|CS0294130001

---
> P-086: 前設問で作成した緯度経度つき顧客データフレーム（df_customer_1）に対し、申込み店舗コード（application_store_cd）をキーに店舗データフレーム（df_store）と結合せよ。そして申込み店舗の緯度（latitude）・経度情報（longitude)と顧客の緯度・経度を用いて距離（km）を求め、顧客ID（customer_id）、顧客住所（address）、店舗住所（address）とともに表示せよ。計算式は簡易式で良いものとするが、その他精度の高い方式を利用したライブラリを利用してもかまわない。結果は10件表示すれば良い。

$$
緯度（ラジアン）：\phi \\
経度（ラジアン）：\lambda \\
距離L = 6371 * arccos(sin \phi_1 * sin \phi_2
+ cos \phi_1 * cos \phi_2 * cos(\lambda_1 − \lambda_2))
$$

In [155]:
df_store.columns

['store_cd',
 'store_name',
 'prefecture_cd',
 'prefecture',
 'address',
 'address_kana',
 'tel_no',
 'longitude',
 'latitude',
 'floor_area']

In [162]:
import math
def calc_distance(x1, y1, x2, y2):
    distance = 6371 * math.acos(math.sin(math.radians(y1)) * math.sin(math.radians(y2)) 
                       + math.cos(math.radians(y1)) * math.cos(math.radians(y2)) 
                            * math.cos(math.radians(x1) - math.radians(x2)))
    return distance

distance_udf = udf(calc_distance)

df_customer_1.join(df_store, df_customer_1["application_store_cd"] == df_store["store_cd"], "inner")\
    .select(
    "customer_id",
    df_customer_1["address"].alias("customer_address"),
    df_store["address"].alias("store_address"),
    (distance_udf(df_store["longitude"],df_store["latitude"],
                  df_customer_1["longitude"],df_customer_1["latitude"])).alias("distence")
    ).show(10)

+--------------+----------------------------+--------------------------+------------------+
|   customer_id|            customer_address|             store_address|          distence|
+--------------+----------------------------+--------------------------+------------------+
|CS030612000044|千葉県船橋市本郷町**********|    千葉県市川市八幡三丁目|3.0519536466934833|
|CS030412000172|千葉県船橋市本郷町**********|    千葉県市川市八幡三丁目|3.0519536466934833|
|CS007503000041|  千葉県佐倉市上座**********|        千葉県佐倉市上志津|1.6994898888167103|
|CS029413000181|  千葉県浦安市海楽**********|    千葉県浦安市東野一丁目| 0.804858125326335|
|CS029613000024|  千葉県浦安市北栄**********|    千葉県浦安市東野一丁目|1.6887555834989676|
|CS038605000003|  千葉県浦安市堀江**********|東京都江戸川区東葛西九丁目| 1.392726722198207|
|CS037412000177|  東京都江東区亀戸**********|    東京都江東区南砂一丁目|1.8500720271507314|
|CS015415000222|  東京都江東区千石**********|    東京都江東区南砂二丁目|   1.0363274460445|
|CS043413000039|東京都品川区北品川**********|  東京都品川区南品川三丁目| 1.190854249536111|
|CS032613000113|東京都大田区新蒲田**********|  東京都大田区仲六郷三丁目|1.1990433571364574|


---
> P-087:  顧客データフレーム（df_customer）では、異なる店舗での申込みなどにより同一顧客が複数登録されている。名前（customer_name）と郵便番号（postal_cd）が同じ顧客は同一顧客とみなし、1顧客1レコードとなるように名寄せした名寄顧客データフレーム（df_customer_u）を作成せよ。ただし、同一顧客に対しては売上金額合計が最も高いものを残すものとし、売上金額合計が同一もしくは売上実績の無い顧客については顧客ID（customer_id）の番号が小さいものを残すこととする。

In [193]:
w = Window().partitionBy("customer_name","postal_cd").orderBy(col("sum(amount)").desc(), col("customer_id"))

df_customer_u = df_receipt.groupby("customer_id")\
    .sum()\
    .join(df_customer, "customer_id", "right")\
    .withColumn("rn", row_number().over(w))\
    .where(col("rn") == 1)

print(df_customer.count())
print(df_customer_u.count())

21971
21941


---
> P-088: 前設問で作成したデータを元に、顧客データフレームに統合名寄IDを付与したデータフレーム（df_customer_n）を作成せよ。ただし、統合名寄IDは以下の仕様で付与するものとする。
>
> - 重複していない顧客：顧客ID（customer_id）を設定
> - 重複している顧客：前設問で抽出したレコードの顧客IDを設定

In [207]:
df_customer_n = df_customer.join(df_customer_u.withColumnRenamed("customer_id","integrated_id"), 
                                 ["customer_name", "postal_cd"], "inner")\
    .select(
    "customer_id",
    "integrated_id"
    )

print(df_customer_n.select("customer_id").distinct().count())
print(df_customer_n.select("integrated_id").distinct().count())

21971
21941


---
> P-閑話: df_customer_1, df_customer_nは使わないので削除する。

In [217]:
del df_customer_1
del df_customer_n

NameError: name 'df_customer_1' is not defined

---
> P-089: 売上実績のある顧客に対し、予測モデル構築のため学習用データとテスト用データに分割したい。それぞれ8:2の割合でランダムにデータを分割せよ。

In [220]:
df_tmp = df_customer.join(df_receipt, "customer_id", "inner")
split = df_tmp.randomSplit([0.8,0.2],seed=24)

print(split[0].count())
print(split[1].count())

52523
13159


---
> P-090: レシート明細データフレーム（df_receipt）は2017年1月1日〜2019年10月31日までのデータを有している。売上金額（amount）を月次で集計し、学習用に12ヶ月、テスト用に6ヶ月のモデル構築用データを3セット作成せよ。

In [229]:
df_receipt.printSchema()

root
 |-- sales_ymd: integer (nullable = true)
 |-- sales_epoch: integer (nullable = true)
 |-- store_cd: string (nullable = true)
 |-- receipt_no: integer (nullable = true)
 |-- receipt_sub_no: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_cd: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- amount: integer (nullable = true)



In [256]:
import pyspark.sql.functions as F
df_sales_month = df_receipt.select(
    "amount",
    year(to_date(col("sales_ymd").cast(StringType()),"yyyyMMdd")).alias("year"),
    month(to_date(col("sales_ymd").cast(StringType()),"yyyyMMdd")).alias("month"),
    ).groupby("year","month")\
    .sum()\
    .sort("year","month")\
    .select("year", "month", "sum(amount)")

def split_df(df, train_size, test_size, slide_window, start_point):
    w = Window.orderBy("year","month")
    train_start = start_point * slide_window
    test_start = train_start + train_size + 1
    
    return_df = df.withColumn("index", F.row_number().over(w))
    train_df = return_df.where(col("index").between(train_start, test_start-1)).drop("index")
    test_df = return_df.where(col("index").between(test_start, test_start+test_size-1)).drop("index")
    return train_df, test_df


df_train_1, df_test_1 = split_df(df_sales_month, train_size=12, test_size=6, slide_window=6, start_point=0)
df_train_2, df_test_2 = split_df(df_sales_month, train_size=12, test_size=6, slide_window=6, start_point=1)
df_train_3, df_test_3 = split_df(df_sales_month, train_size=12, test_size=6, slide_window=6, start_point=2)

In [257]:
df_train_1.show()

+----+-----+-----------+
|year|month|sum(amount)|
+----+-----+-----------+
|2017|    1|     902056|
|2017|    2|     764413|
|2017|    3|     962945|
|2017|    4|     847566|
|2017|    5|     884010|
|2017|    6|     894242|
|2017|    7|     959205|
|2017|    8|     954836|
|2017|    9|     902037|
|2017|   10|     905739|
|2017|   11|     932157|
|2017|   12|     939654|
+----+-----+-----------+



In [258]:
df_test_1.show()

+----+-----+-----------+
|year|month|sum(amount)|
+----+-----+-----------+
|2018|    1|     944509|
|2018|    2|     864128|
|2018|    3|     946588|
|2018|    4|     937099|
|2018|    5|    1004438|
|2018|    6|    1012329|
+----+-----+-----------+



---
> P-091: 顧客データフレーム（df_customer）の各顧客に対し、売上実績のある顧客数と売上実績のない顧客数が1:1となるようにアンダーサンプリングで抽出せよ。

In [283]:
purchase = df_customer.join(df_receipt.groupby("customer_id").sum(),
                            "customer_id", "inner").count()

all_customer = df_customer.count() 
print(f"purchase:{purchase}, \nnot purchase:{all_customer-purchase}")

fraction = {True:1, False:purchase/(all_customer-purchase)}

sample = df_customer.join(df_receipt.groupby("customer_id").sum(),
                          "customer_id", "left")\
    .withColumn("does_purchase", ~col("sum(amount)").isNull())\
    .sampleBy("does_purchase", fraction)
    
sample.groupby("does_purchase")\
    .count()\
    .show(10)

purchase:8306, 
not purchase:13665
+-------------+-----+
|does_purchase|count|
+-------------+-----+
|         true| 8306|
|        false| 8273|
+-------------+-----+



---
> P-092: 顧客データフレーム（df_customer）では、性別に関する情報が非正規化の状態で保持されている。これを第三正規化せよ。

In [284]:
df_gender = df_customer.select("gender_cd","gender").distinct()
df_customer_s = df_customer.drop("gender")

In [285]:
df_gender.show(5)
df_customer_s.show(5)

+---------+------+
|gender_cd|gender|
+---------+------+
|        9|  不明|
|        0|  男性|
|        1|  女性|
+---------+------+

+--------------+-------------+---------+----------+---+---------+--------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd| birth_day|age|postal_cd|                         address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+----------+---+---------+--------------------------------+--------------------+----------------+------------+
|CS021313000114|  大野 あや子|        1|1981-04-29| 37| 259-1113|  神奈川県伊勢原市粟窪**********|              S14021|        20150905|0-00000000-0|
|CS037613000071|    六角 雅彦|        9|1952-04-01| 66| 136-0076|      東京都江東区南砂**********|              S13037|        20150414|0-00000000-0|
|CS031415000172|宇多田 貴美子|        1|1976-10-04| 42| 151-0053|    東京都渋谷区代々木**********|              S13031|        20150529|D-20100325-C|
|CS02881100000

---
> P-093: 商品データフレーム（df_product）では各カテゴリのコード値だけを保有し、カテゴリ名は保有していない。カテゴリデータフレーム（df_category）と組み合わせて非正規化し、カテゴリ名を保有した新たな商品データフレームを作成せよ。

---
> P-094: 先に作成したカテゴリ名付き商品データを以下の仕様でファイル出力せよ。なお、出力先のパスはdata配下とする。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ有り
> - 文字コードはUTF-8

---
> P-095: 先に作成したカテゴリ名付き商品データを以下の仕様でファイル出力せよ。なお、出力先のパスはdata配下とする。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ有り
> - 文字コードはCP932

---
> P-096: 先に作成したカテゴリ名付き商品データを以下の仕様でファイル出力せよ。なお、出力先のパスはdata配下とする。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ無し
> - 文字コードはUTF-8

---
> P-097: 先に作成した以下形式のファイルを読み込み、データフレームを作成せよ。また、先頭10件を表示させ、正しくとりまれていることを確認せよ。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ有り
> - 文字コードはUTF-8

---
> P-098: 先に作成した以下形式のファイルを読み込み、データフレームを作成せよ。また、先頭10件を表示させ、正しくとりまれていることを確認せよ。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ無し
> - 文字コードはUTF-8

---
> P-099: 先に作成したカテゴリ名付き商品データを以下の仕様でファイル出力せよ。なお、出力先のパスはdata配下とする。
>
> - ファイル形式はTSV（タブ区切り）
> - ヘッダ有り
> - 文字コードはUTF-8

---
> P-100: 先に作成した以下形式のファイルを読み込み、データフレームを作成せよ。また、先頭10件を表示させ、正しくとりまれていることを確認せよ。
>
> - ファイル形式はTSV（タブ区切り）
> - ヘッダ有り
> - 文字コードはUTF-8

# これで１００本終わりです。おつかれさまでした！