In [1]:
%load_ext autoreload
%autoreload 2

import datetime
import numpy as np
import pandas as pd
from dateutil.relativedelta import relativedelta
from tqdm.notebook import tqdm
from tqdm.contrib import tzip, tenumerate, tmap

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import col
import pyspark.sql.types as pstype
import pyspark.sql.functions as F
import pyspark as ps

import matplotlib as mlt
import matplotlib.pyplot as plt
import japanize_matplotlib

from time_series_model import *

%matplotlib inline
%matplotlib ipympl

In [2]:
np.set_printoptions(threshold=100000, precision=4, linewidth=10000)
ps_conf = ps.SparkConf().set("spark.logConf", "false")\
            .set("spark.executor.memory", "12g")\
            .set("spark.driver.memory", "4g")\
            .set("spark.executor.cores", "7")\
            .set("spark.sql.shuffle.partitions", "500")\
            .set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+UseStringDeduplication")\
            .set("spark.eventLog.gcMetrics.youngGenerationGarbageCollectors", "G1 Young Generation")\
            .set("spark.eventLog.gcMetrics.oldGenerationGarbageCollectors", "G1 Old Generation")\
			.set("spark.logConf", "false")
spark = SparkSession.builder.config(conf=ps_conf).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/25 16:40:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
SPECIFIED_PATH = "csv_data/"
SPECIFIED_DATE = "20241120"
SPECIFIED_CSV  = SPECIFIED_PATH + SPECIFIED_DATE

In [4]:
input_path  = SPECIFIED_CSV + "_raw_data_sumida_bunka.csv"
df_raw_data = spark.read.option("inferSchema", "True").option("header", "True").csv(input_path)
df_raw_data.persist(StorageLevel.MEMORY_AND_DISK_DESER)

utid_list = sorted(df_raw_data.select("unit_id").drop_duplicates().rdd.flatMap(lambda x: x).collect())

24/11/25 16:40:42 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [5]:
df_raw_data\
		.filter(col("unit_id").isin(utid_list))\
		.filter(col("randomized") == 1)\
		.select(['date', 'datetime', 'unit_id', 'aibeaconid'])\
        .withColumn('minute', F.date_trunc('minute', 'datetime'))\
        .withColumn('minute', F.unix_timestamp('minute'))\
        .withColumn('minute', col('minute') - (col('minute') % 60))\
        .withColumn('minute', F.from_unixtime('minute'))\
        .select(['date', 'minute', 'unit_id', 'aibeaconid'])\
        .drop_duplicates()\
        .groupBy(['date', 'minute', 'unit_id'])\
        .agg(F.count('*').alias('1min_count'))\
        .orderBy(['date', 'minute', 'unit_id'])\
        .toPandas()\
        .to_csv(SPECIFIED_CSV + "_1min_data_sumida_bunka.csv", header=True, index=False)

24/11/25 16:40:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/25 16:40:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/25 16:40:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/25 16:40:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/25 16:40:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/25 16:40:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/25 16:40:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/25 16:40:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/25 16:40:58 WARN RowBasedKeyValueBatch: Calling spill() on

In [6]:
SPECIFIED_PAST = (datetime.datetime.strptime(SPECIFIED_DATE, '%Y%m%d') - relativedelta(days=1) + datetime.timedelta(hours=9)).strftime('%Y-%m-%d')

start_time = f'{SPECIFIED_PAST} 00:00:00'
end_time = f'{SPECIFIED_PAST} 23:59:00'
# 1分単位の時間列を作成
time_range = pd.date_range(start=start_time, end=end_time, freq='min')
# DataFrameに変換
df_time = pd.DataFrame(time_range, columns=['datetime'])
df_time = spark.createDataFrame(df_time)\
				.withColumn('hour', F.hour(col('datetime')))

df_by1min = spark.read\
				.option('header', True)\
				.option('inferSchema', True)\
           		.csv(SPECIFIED_CSV + "_1min_data_sumida_bunka.csv")

for unit_id in utid_list:
    df_tmp  = df_by1min\
        		.filter(col('unit_id') == unit_id)\
          		.select(['minute', '1min_count'])\
            	.withColumnRenamed('minute',     'datetime')\
                .withColumnRenamed('1min_count', unit_id)
    df_time = df_time.join(df_tmp, on='datetime', how='left')
df_time = df_time\
			.fillna(0)\
    		.orderBy('datetime')
df_time.persist(StorageLevel.MEMORY_AND_DISK_DESER)
df_time.show()

24/11/25 16:41:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------------------+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|           datetime|hour|1002A25B|1002A25C|1002A25D|1002A5BA|1002A5BB|1002A5BD|1002A5BE|1002A5BF|1002A5C1|1002A5C3|1002A5C4|1002A5C5|1002A5C6|1002A5C7|1002A5C9|1002A5CA|1002A5CB|1002A5CC|1002A5CE|1002A5CF|1002A5D0|1002A5D1|1002A6A5|1002A6A7|1002A6A8|1002A6AF|1002A6B0|1002A6B2|1002A6B3|1002A6B6|1002A6B9|1002A6BA|1002A6BC|1002A6BD|1002A6BE|10

                                                                                

In [7]:
SPECIFIED_HOUR = 13

pd_data = df_time.toPandas()
pd_data = pd_data[pd_data['hour'] == SPECIFIED_HOUR]
pd_data = pd_data[utid_list]
pd_data

                                                                                

Unnamed: 0,1002A25B,1002A25C,1002A25D,1002A5BA,1002A5BB,1002A5BD,1002A5BE,1002A5BF,1002A5C1,1002A5C3,...,1002ABE5,1002ABE8,1002ABF2,1002ABF3,1002ACA9,1002ACAA,1002ACB0,1002ACB2,1002ACB8,1002ACB9
780,8,21,46,29,12,31,9,8,6,0,...,71,84,2,60,94,15,93,1,21,2
781,13,20,51,20,13,12,13,16,15,1,...,57,78,6,63,85,15,110,0,16,6
782,23,19,57,27,29,18,7,34,32,0,...,62,66,8,56,71,28,93,4,14,10
783,17,18,65,26,24,16,6,29,21,0,...,65,83,7,65,97,28,103,6,22,9
784,5,15,64,22,14,14,8,35,5,0,...,80,92,23,53,84,14,90,19,28,20
785,12,17,49,26,19,16,13,13,17,1,...,68,68,10,53,89,16,93,10,14,8
786,10,22,45,19,13,23,8,13,12,0,...,71,95,8,62,82,18,102,11,21,8
787,7,14,60,23,7,28,6,9,11,0,...,54,80,6,65,102,7,94,11,14,16
788,5,11,55,21,9,16,6,13,13,0,...,63,72,7,37,88,6,88,8,18,5
789,8,18,40,26,10,25,4,8,12,0,...,69,90,3,73,80,12,91,4,8,5


In [8]:
test_data = pd_data.values.tolist()
test_data

[[8,
  21,
  46,
  29,
  12,
  31,
  9,
  8,
  6,
  0,
  33,
  78,
  38,
  10,
  16,
  1,
  115,
  92,
  9,
  40,
  71,
  19,
  93,
  35,
  1,
  84,
  32,
  49,
  84,
  55,
  2,
  1,
  68,
  1,
  54,
  26,
  20,
  108,
  0,
  31,
  42,
  30,
  67,
  33,
  39,
  36,
  83,
  109,
  107,
  8,
  4,
  47,
  0,
  109,
  18,
  102,
  120,
  22,
  21,
  72,
  71,
  84,
  2,
  60,
  94,
  15,
  93,
  1,
  21,
  2],
 [13,
  20,
  51,
  20,
  13,
  12,
  13,
  16,
  15,
  1,
  28,
  79,
  33,
  11,
  13,
  12,
  102,
  98,
  15,
  41,
  63,
  19,
  86,
  26,
  9,
  64,
  32,
  49,
  90,
  54,
  7,
  4,
  68,
  7,
  45,
  31,
  16,
  97,
  1,
  43,
  43,
  28,
  57,
  34,
  29,
  27,
  81,
  106,
  123,
  11,
  3,
  51,
  0,
  87,
  22,
  98,
  97,
  22,
  15,
  63,
  57,
  78,
  6,
  63,
  85,
  15,
  110,
  0,
  16,
  6],
 [23,
  19,
  57,
  27,
  29,
  18,
  7,
  34,
  32,
  0,
  32,
  71,
  26,
  13,
  15,
  9,
  87,
  81,
  32,
  47,
  68,
  29,
  79,
  22,
  9,
  62,
  30,
  62,
  62,
  52,


In [156]:
model = Sparse_Vector_Auto_Regressive(test_data, norm_α=1, l1_ratio=0.02, tol=1e-10, learning_rate=0.0001, isStandardization=True)

lag = 4
model.fit(lags=lag, solver="external library", visible_flg=True)

x_tmp = np.array([np.array(test_data)[t-lag : t][::-1].ravel() for t in range(lag, len(test_data))])
y_tmp = test_data[lag:]
mean  = model.predict(x_tmp)

#print(f"alpha0:{model.alpha0}\nalpha:{model.alpha}", flush=True)
mse = np.sum((y_tmp - mean) ** 2) / len(y_tmp)
mse

平均二乗誤差(MSE): 11.394119644817446
L2正則化項(l2 norm): 6.93314049741681
L1正則化項(l1 norm): 219.8130501874836
目的関数(Objective):  755.4713415139875
目的関数(Objective)の微分:  4523.527159140646


np.float64(1478.032645015837)

In [157]:
model = Sparse_Vector_Auto_Regressive(test_data, norm_α=1, l1_ratio=0.02, tol=1e-10, learning_rate=0.0001, isStandardization=True)

lag = 4
model.fit(lags=lag, solver="coordinate descent", visible_flg=True)

x_tmp = np.array([np.array(test_data)[t-lag : t][::-1].ravel() for t in range(lag, len(test_data))])
y_tmp = test_data[lag:]
mean = model.predict(x_tmp)

# print(f"alpha0:{model.alpha0}\nalpha:{model.alpha}", flush=True)
mse = np.sum((y_tmp - mean) ** 2) / len(y_tmp)
mse

ite:1  mse:0.27230498159279587  ΔDiff:3.9050069102623324
ite:2  mse:0.06348722662366094  ΔDiff:1.8855462579647873
ite:3  mse:0.019136587691935207  ΔDiff:1.035204767545229
ite:4  mse:0.00840610403330714  ΔDiff:0.6861062788411135
ite:5  mse:0.004552298398698128  ΔDiff:0.5049046546894722
ite:6  mse:0.0027877603800732747  ΔDiff:0.3951133777589711
ite:7  mse:0.0018340604947227612  ΔDiff:0.32047993338815245
ite:8  mse:0.0013003268437461152  ΔDiff:0.26984866731148116
ite:9  mse:0.0009330207118263066  ΔDiff:0.2285807512943143
ite:10  mse:0.0007002066058396191  ΔDiff:0.19801911505462969
ite:11  mse:0.0005411665779984957  ΔDiff:0.17408425651941004
ite:12  mse:0.00042978417099278054  ΔDiff:0.15513836912767812
ite:13  mse:0.00034276306528677403  ΔDiff:0.1385450528025427
ite:14  mse:0.00027416803376583034  ΔDiff:0.12390887736916391
ite:15  mse:0.00022024388679209084  ΔDiff:0.1110570018520088
ite:16  mse:0.00017862797616258476  ΔDiff:0.10001583207225118
ite:17  mse:0.000146010973682565  ΔDiff:0.0904

np.float64(1478.0326450013001)

In [161]:
model = Sparse_Vector_Auto_Regressive(test_data, norm_α=1, l1_ratio=0.02, tol=1e-10, learning_rate=0.0001, max_iterate=30000, isStandardization=True)

lag = 4
model.fit(lags=lag, solver="ISTA", visible_flg=True)

x_tmp = np.array([np.array(test_data)[t-lag : t][::-1].ravel() for t in range(lag, len(test_data))])
y_tmp = test_data[lag:]
mean  = model.predict(x_tmp)

#print(f"alpha0:{model.alpha0}\nalpha:{model.alpha}", flush=True)
mse = np.sum((y_tmp - mean) ** 2) / len(y_tmp)
mse

ite:1  mse:76902.32892192036  update_diff:89414.08949356654
ite:5001  mse:4776.015129765225  update_diff:20497.271023689806
ite:10001  mse:69.23389588831712  update_diff:1933.559586826041
ite:15001  mse:11.551596438681344  update_diff:210.39196041100826
ite:20001  mse:11.394158029019783  update_diff:133.14248755971425
ite:25001  mse:11.394120188153888  update_diff:133.14177826903997
平均二乗誤差(MSE): 11.39421982164373
L2正則化項(l2 norm): 6.9332472298079795
L1正則化項(l1 norm): 219.80957741072038
目的関数(Objective):  755.4731856919623
目的関数(Objective)の微分:  4525.023074481462


np.float64(1478.0956238884937)

In [None]:
from sklearn.linear_model import ElasticNet

model = ElasticNet(alpha=1, l1_ratio=0.1, max_iter=1000000, tol=1e-3)

lag   = 4
x_tmp = np.array([np.array(test_data)[t-lag : t][::-1].ravel() for t in range(lag, len(test_data))])
y_tmp = test_data[lag:]
model.fit(x_tmp, y_tmp)
mean  = model.predict(x_tmp)

print(f"alpha0:{model.intercept_}\nalpha:{model.coef_}", flush=True)
mse = np.sum((y_tmp - mean) ** 2) / len(y_tmp)
mse