In [1]:
from pyspark.sql import SparkSession
import altair as alt
import pandas as pd

spark = SparkSession.builder.getOrCreate()

In [2]:
def load(name):
    print(f"===================== {name} =====================")
    df = spark.read.parquet(f"./work/tobi/parquet/{name}.parquet")
    df.createOrReplaceTempView(name)
    df.printSchema()
    print("#Rows:", df.count())
    df.show(2)
    print()
    return df

In [3]:
activity_logs = load("activity_logs")
financial_journal = load("financial_journal")

root
 |-- timestamp: timestamp (nullable = true)
 |-- currentLocation: string (nullable = true)
 |-- participantId: integer (nullable = true)
 |-- currentMode: string (nullable = true)
 |-- hungerStatus: string (nullable = true)
 |-- sleepStatus: string (nullable = true)
 |-- apartmentId: integer (nullable = true)
 |-- availableBalance: float (nullable = true)
 |-- jobId: integer (nullable = true)
 |-- financialStatus: string (nullable = true)
 |-- dailyFoodBudget: float (nullable = true)
 |-- weeklyExtraBudget: float (nullable = true)
 |-- currentLocationX: float (nullable = true)
 |-- currentLocationY: float (nullable = true)
 |-- date: date (nullable = true)

#Rows: 113923735
+-------------------+--------------------+-------------+-----------+------------+-----------+-----------+----------------+-----+---------------+---------------+-----------------+----------------+----------------+----------+
|          timestamp|     currentLocation|participantId|currentMode|hungerStatus|sleepSt

In [4]:
activity_logs.select('timestamp', 'sleepStatus', 'participantId').where((activity_logs.currentMode == 'AtHome')).show(5)

+-------------------+--------------+-------------+
|          timestamp|   sleepStatus|participantId|
+-------------------+--------------+-------------+
|2022-03-03 00:00:00|      Sleeping|            0|
|2022-03-03 00:00:00|      Sleeping|            2|
|2022-03-03 00:00:00|      Sleeping|            3|
|2022-03-03 00:00:00|      Sleeping|            4|
|2022-03-03 00:00:00|PrepareToSleep|            5|
+-------------------+--------------+-------------+
only showing top 5 rows



In [5]:
spark.sql("SELECT timestamp, sleepStatus, hungerStatus, financialStatus, count(*) from activity_logs group by timestamp, sleepStatus, hungerStatus, financialStatus order by timestamp asc").show()

+-------------------+-----------+------------+---------------+--------+
|          timestamp|sleepStatus|hungerStatus|financialStatus|count(1)|
+-------------------+-----------+------------+---------------+--------+
|2022-03-01 00:00:00|      Awake|     JustAte|       Unstable|       3|
|2022-03-01 00:00:00|      Awake|     JustAte|        Unknown|      31|
|2022-03-01 00:00:00|   Sleeping|     JustAte|       Unstable|       1|
|2022-03-01 00:00:00|      Awake|     JustAte|         Stable|     196|
|2022-03-01 00:00:00|   Sleeping|     JustAte|         Stable|     669|
|2022-03-01 00:00:00|   Sleeping|     JustAte|        Unknown|     111|
|2022-03-01 00:05:00|      Awake|     JustAte|         Stable|     168|
|2022-03-01 00:05:00|   Sleeping|     JustAte|         Stable|     697|
|2022-03-01 00:05:00|   Sleeping|     JustAte|        Unknown|     113|
|2022-03-01 00:05:00|      Awake|     JustAte|       Unstable|       3|
|2022-03-01 00:05:00|      Awake|     JustAte|        Unknown|  

In [6]:
spark.sql("select category, sum(amount) from financial_journal group by category order by category asc").show()

+--------------+-------------------+
|      category|        sum(amount)|
+--------------+-------------------+
|     Education|-188635.79949443735|
|          Food| -4562517.832099259|
|    Recreation| -4973094.303666381|
|RentAdjustment|  54932.73219610953|
|       Shelter| -8982423.083224718|
|          Wage|5.563302183606546E7|
+--------------+-------------------+



In [7]:
alt.Chart(spark.sql("select category, sum(amount) as total from financial_journal group by category order by total desc").toPandas()).mark_bar().encode(
    alt.Y('category', type='nominal'),
    alt.X('total', type = 'quantitative')
)

  for col_name, dtype in df.dtypes.iteritems():


In [8]:
print(spark.sql("select distinct date(timestamp) as d from financial_journal").count())

451


In [9]:
df = spark.sql("select date(timestamp) as date, category, sum(amount) as total from financial_journal group by 1, 2 order by 1 asc, 2 asc").toPandas()
df["date"] = pd.to_datetime(df["date"])

alt.Chart(df).mark_area().encode(
    y=alt.Y('total', title="Total"), 
    x=alt.X('date', title='Date'), 
    color='category',
    tooltip=['date', 'total', 'category']
).properties(height=500, width=1500, title="Population financial transactions").interactive()

  for col_name, dtype in df.dtypes.iteritems():


In [10]:
total = spark.sql("select distinct date_trunc('day', timestamp), participantId from activity_logs").count()

hunger = spark.sql(f"""
    select to_timestamp(date_format(timestamp, "HH:mm"), "HH:mm") as clocktime, hungerStatus as status, count(*) / {total} as percentage 
    from activity_logs group by 1, 2
""").toPandas() 
sleep = spark.sql(f"""
    select to_timestamp(date_format(timestamp, "HH:mm"), "HH:mm") as clocktime, sleepStatus as status, count(*) / {total} as percentage 
    from activity_logs group by 1, 2
""").toPandas() 
financial = spark.sql(f"""
    select to_timestamp(date_format(timestamp, "HH:mm"), "HH:mm") as clocktime, financialStatus as status, count(*) / {total} as percentage 
    from activity_logs group by 1, 2
""").toPandas() 

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [11]:
hunger.drop(hunger[hunger['status'] == 'NA'].index, inplace=True) # Limpiar 1 registro corrupto
hunger.drop(hunger[hunger['status'] == 'Beco'].index, inplace=True) # Limpiar 1 registro corrupto

sortOrder = ["Starving", "Hungry", "BecomingHungry", "JustAte", "BecameFull"]

alt.Chart(hunger).mark_area().encode(
    x = alt.X('clocktime:T', title="Time of day", axis=alt.Axis(format="%H:%M")),
    y = alt.Y('percentage:Q', title="Population", axis=alt.Axis(format='%')),
    color = alt.Color('status', title="Status", sort=sortOrder, scale=alt.Scale(domain=sortOrder, range=['darkred', 'orange', 'gold', 'steelblue', 'green'])),
    tooltip = [alt.Tooltip('clocktime:T', format="%H:%M"), alt.Tooltip('percentage', format=".2%"), 'status']
).properties(width=1000, title='Hunger status of the population')

  for col_name, dtype in df.dtypes.iteritems():


In [12]:
sleep.drop(sleep[sleep["status"] == "NA"].index, inplace=True) # Limpiar 2 registros corruptos

sortOrder = ["Awake", "PrepareToSleep", "Sleeping"]
alt.Chart(sleep).mark_area().encode(
    x = alt.X('clocktime:T', title="Time of day", axis=alt.Axis(format="%H:%M")),#ticks = alt.ExprRef(str([f"{i:02}:00" for i in range(0,12)])))),
    y = alt.Y('percentage:Q', title="Population", axis=alt.Axis(format='%')),
    color = alt.Color('status', title="Status", sort=sortOrder, scale = alt.Scale(domain=sortOrder, range=['coral', 'yellow', 'steelblue'])),
    tooltip = [alt.Tooltip('clocktime:T', format="%H:%M"), alt.Tooltip('percentage', format=".2%"), 'status']
).properties(width=1000, title='Sleep status of the population')

In [13]:
alt.Chart(financial).mark_area().encode(
    x = alt.X('clocktime:T', title="Time of day", axis=alt.Axis(format="%H:%M")),#ticks = alt.ExprRef(str([f"{i:02}:00" for i in range(0,12)])))),
    y = alt.Y('percentage:Q', title="Population", axis=alt.Axis(format='%')),
    color = alt.Color('status', title="Status", sort=sortOrder),
    tooltip = [alt.Tooltip('clocktime:T', format="%H:%M"), alt.Tooltip('percentage', format=".2%"), 'status']
).properties(width=1000, title='Financial status of the population')