Import necessary packages

In [1]:
import boto3
import os
import glob
import shutil
import pyarrow.csv as csv
import pyarrow.parquet as pq
import pyarrow as pa
from io import BytesIO

Read in parquet files from S3 and store them as separate tables

In [2]:
import boto3
import pyarrow.parquet as pq
from io import BytesIO

# Generate Boto3 S3 client
s3_client = boto3.client('s3')

bucket_name = 'noturs'  
parquet_folder = 'rplace-parquet/' 
# Function to read Parquet files from S3 bucket
def read_parquet_from_s3(bucket, folder, file_name):
    response = s3_client.get_object(Bucket=bucket, Key=folder + file_name)
    parquet_file = pq.read_table(BytesIO(response['Body'].read()))
    return parquet_file


parquet_files = [
    'combined_chunk_0.parquet',
    'combined_chunk_1.parquet',
    'combined_chunk_2.parquet',
    'combined_chunk_3.parquet',
    'combined_chunk_4.parquet',
    'combined_chunk_5.parquet'
]

# Read Parquet files into a dictionary of PyArrow Tables
parquet_tables = {}
for i, file_name in enumerate(parquet_files):
    table_name = f'df{i+1}'
    table = read_parquet_from_s3(bucket_name, parquet_folder, file_name)
    parquet_tables[table_name] = table

  from pandas.core.computation.check import NUMEXPR_INSTALLED


In [3]:
print(parquet_tables['df1'].to_pandas().head())

                     timestamp  \
0  2023-07-20 13:00:26.088 UTC   
1  2023-07-20 13:00:43.658 UTC   
2  2023-07-20 13:00:43.705 UTC   
3  2023-07-20 13:01:02.487 UTC   
4  2023-07-20 13:01:40.445 UTC   

                                                user coordinate pixel_color  
0  no+8HEIDjbdx7/LxH9Xr+h4lyoar0MRTYugWKrGdQOg7dF...  -199,-235     #FFFFFF  
1  qJ7O6cuUNfkDyn+ZOEYR+UiVEmAu/vYfm/s4hK0XJytqAx...     0,-298     #FF4500  
2  uqi5XwkBePwcPKJgGOxHKzzzXuZKU6iKZT+OVfUJfaRKek...   -42,-218     #FFFFFF  
3  rgSTj7FHZUHsLXZLLNj9+vxibHIowb2+UhfFLDYgsVBXqI...  -418,-232     #B44AC0  
4  2bmivBNj8NYvnp/15k0EqC+75T1OxTtCUcRG2pf0c5btzF...    182,164     #FF4500  


Create an array of 32 colors used in /rplace for the function to loop through

In [4]:
colors = [
'#6B0119', 
'#244FA4',
'#6D462F',
'#BD0037',
'#3790EA', 
'#9B6926',
'#FF4500',
'#52E8F3',
'#FEB470',
'#FEA800',
'#4839BF',
'#000000',
'#FFD435',
'#695BFF',
'#525252',
'#FEF8B9',
'#94B3FF',
'#888090',
'#01A267',
'#801D9F',
'#D5D6D8',
'#09CC76',
'#B449BF',
'#FFFFFF',
'#7EEC57',
'#E4ABFD',
'#02756D',
'#DD117E',
'#009DAAL',
'#FE3781',
'#00CCB',
'#FE99A9']

Function to detect based on the following steps:

-Grab all pixels with a specific color (from array) and a specific timestamp
-Filter down to whether those pixels has been changed to a different color no matter how late later (however, the timestamp has to be the next one after the original timestamp).
-See whether those pixels are switched back to the original color immediately after it was changed to a different color (within 20 milliseconds).

In [5]:
import pyarrow.parquet as pq
import pandas as pd

def find_color_changes(colors, parquet_tables, output_file):
    # Create an empty list to store the results
    result_tables = []

    # Iterate through each color
    for color in colors:
        # Create variables to store the original color and changed color
        original_color = None
        changed_color = None

        # Loop through Parquet tables to find pixels with the current color
        for table_name, table in parquet_tables.items():
            # Convert Parquet table to pandas DataFrame
            table_df = table.to_pandas()

            # Filter rows with the current color
            filtered_rows = table_df[table_df['pixel_color'] == color]

            # Check if any rows are found with the current color
            if not filtered_rows.empty:
                # If original color is not set, set it to the current color
                if original_color is None:
                    original_color = color
                    # Store matching rows in the result DataFrame
                    result_tables.append(filtered_rows)

                else:
                    # If changed color is not set, set it to the current color
                    if changed_color is None:
                        changed_color = color
                    # If both original and changed colors are set, break the loop
                    if original_color is not None and changed_color is not None:
                        break

        # Check if both original and changed colors are found
        if original_color is not None and changed_color is not None:
            # Find the next timestamp after the original color change
            next_timestamp = pd.Timestamp(result_tables[-1][result_tables[-1]['pixel_color'] == original_color].iloc[0]['timestamp']) + pd.Timedelta(milliseconds=1)

            # Filter rows with the changed color at the next timestamp
            changed_rows = result_tables[-1][(result_tables[-1]['timestamp'] == next_timestamp) & (result_tables[-1]['pixel_color'] == changed_color)]

            # If changed color rows are found, check for reversion to the original color
            if not changed_rows.empty:
                # Find the timestamp range for reversion
                start_time = changed_rows.iloc[0]['timestamp']
                end_time = start_time + pd.Timedelta(milliseconds=20)

                # Filter rows with the original color within the timestamp range
                reverted_rows = result_tables[-1][(result_tables[-1]['timestamp'] >= start_time) & (result_tables[-1]['timestamp'] <= end_time) & (result_tables[-1]['pixel_color'] == original_color)]

                # If reverted rows are found, add them to the result list
                if not reverted_rows.empty:
                    result_tables.append(reverted_rows)

    # Concatenate all result tables into one DataFrame
    result_table = pd.concat(result_tables, ignore_index=True)

    # Convert DataFrame to PyArrow Table
    result_arrow_table = pa.Table.from_pandas(result_table)

    # Write the PyArrow Table to a Parquet file
    pq.write_table(result_arrow_table, output_file)

# Example usage:
find_color_changes(colors, parquet_tables, output_file='result.parquet')


Save result to S3 bucket (subset parquet file)

In [9]:

bucket_name = 'noturs'
parquet_folder = 'rplace-parquet/'
output_file_name = 'result.parquet'  
s3_file_path = f"{parquet_folder}{output_file_name}"  


s3_client = boto3.client('s3')

# Function to upload the file to S3
def upload_file_to_s3(local_file_name, bucket, s3_file_path):
    try:
        s3_client.upload_file(local_file_name, bucket, s3_file_path)
        print(f"File {local_file_name} uploaded to {bucket}/{s3_file_path}")
    except Exception as e:
        print(f"Error uploading file to S3: {e}")


upload_file_to_s3(output_file_name, bucket_name, s3_file_path)


File result.parquet uploaded to noturs/rplace-parquet/result.parquet


Check data quality

In [3]:
import pandas as pd

output_file = 'result.parquet'

result_df = pd.read_parquet(output_file)

num_rows_pandas = len(result_df)

print(f"Number of rows in the pandas DataFrame: {num_rows_pandas}")


  from pandas.core.computation.check import NUMEXPR_INSTALLED


Number of rows in the pandas DataFrame: 15480924


In [4]:
result_df

Unnamed: 0,timestamp,user,coordinate,pixel_color
0,2023-07-20 13:00:43.658 UTC,qJ7O6cuUNfkDyn+ZOEYR+UiVEmAu/vYfm/s4hK0XJytqAx...,"0,-298",#FF4500
1,2023-07-20 13:01:40.445 UTC,2bmivBNj8NYvnp/15k0EqC+75T1OxTtCUcRG2pf0c5btzF...,182164,#FF4500
2,2023-07-20 13:04:21.997 UTC,ZyxEvOoOROWtKV4j+7ReL71h0DlibBSu0Bxdke0dQhbgG2...,"23,-188",#FF4500
3,2023-07-20 13:04:42.711 UTC,OliFXK03g6C/dIlK8W22QblkEC+CraYI86U78uzxSf2fl3...,16337,#FF4500
4,2023-07-20 13:04:45.288 UTC,8fwj/jPEvokM9VJZEDPCaxJLpcih9qOVyudMZT/HpSNt4S...,"-36,-211",#FF4500
...,...,...,...,...
15480919,2023-07-21 16:54:47.428 UTC,Bx9buiUSKRI9pHHSIRIFRPcGvQa0UIknyXCwS2pcDMnqi3...,356123,#FFFFFF
15480920,2023-07-21 16:54:47.43 UTC,2T5WDv5Q53gAphdu+EcpR5bZ5TcsrNyzCdT7Xd3liTp7Gh...,355121,#FFFFFF
15480921,2023-07-21 16:54:47.445 UTC,uoP0ZNA+FnKXEhCLcTz53AjVvDWyV7w5x3Guod8Iy7H4BW...,-126144,#FFFFFF
15480922,2023-07-21 16:54:47.449 UTC,chEQO/6dDwueNdD6xvglAsfje5/CF6lW1eka0f8QH7Cu81...,-107492,#FFFFFF


In [5]:
# Unique Users
result_df['user'].nunique()

2882318

In [6]:
# Count by Color
result_df['pixel_color'].value_counts()

pixel_color
#000000    6202252
#FFFFFF    4756167
#FF4500    4516589
#94B3FF       5916
Name: count, dtype: int64

In [8]:
result_df.dtypes

timestamp      object
user           object
coordinate     object
pixel_color    object
dtype: object

In [12]:
result_df[['x', 'y']] = result_df['coordinate'].str.extract(r'(?P<x>-?\d+)\s*,\s*(?P<y>-?\d+)')


                            timestamp  \
0         2023-07-20 13:00:43.658 UTC   
1         2023-07-20 13:01:40.445 UTC   
2         2023-07-20 13:04:21.997 UTC   
3         2023-07-20 13:04:42.711 UTC   
4         2023-07-20 13:04:45.288 UTC   
...                               ...   
15480919  2023-07-21 16:54:47.428 UTC   
15480920   2023-07-21 16:54:47.43 UTC   
15480921  2023-07-21 16:54:47.445 UTC   
15480922  2023-07-21 16:54:47.449 UTC   
15480923  2023-07-21 16:54:47.449 UTC   

                                                       user coordinate  \
0         qJ7O6cuUNfkDyn+ZOEYR+UiVEmAu/vYfm/s4hK0XJytqAx...     0,-298   
1         2bmivBNj8NYvnp/15k0EqC+75T1OxTtCUcRG2pf0c5btzF...    182,164   
2         ZyxEvOoOROWtKV4j+7ReL71h0DlibBSu0Bxdke0dQhbgG2...    23,-188   
3         OliFXK03g6C/dIlK8W22QblkEC+CraYI86U78uzxSf2fl3...     16,337   
4         8fwj/jPEvokM9VJZEDPCaxJLpcih9qOVyudMZT/HpSNt4S...   -36,-211   
...                                                     ...    

In [27]:
result_df

Unnamed: 0,timestamp,user,coordinate,pixel_color,x,y
0,2023-07-20 13:00:43.658 UTC,qJ7O6cuUNfkDyn+ZOEYR+UiVEmAu/vYfm/s4hK0XJytqAx...,"0,-298",#FF4500,0,-298
1,2023-07-20 13:01:40.445 UTC,2bmivBNj8NYvnp/15k0EqC+75T1OxTtCUcRG2pf0c5btzF...,182164,#FF4500,182,164
2,2023-07-20 13:04:21.997 UTC,ZyxEvOoOROWtKV4j+7ReL71h0DlibBSu0Bxdke0dQhbgG2...,"23,-188",#FF4500,23,-188
3,2023-07-20 13:04:42.711 UTC,OliFXK03g6C/dIlK8W22QblkEC+CraYI86U78uzxSf2fl3...,16337,#FF4500,16,337
4,2023-07-20 13:04:45.288 UTC,8fwj/jPEvokM9VJZEDPCaxJLpcih9qOVyudMZT/HpSNt4S...,"-36,-211",#FF4500,-36,-211
...,...,...,...,...,...,...
15480919,2023-07-21 16:54:47.428 UTC,Bx9buiUSKRI9pHHSIRIFRPcGvQa0UIknyXCwS2pcDMnqi3...,356123,#FFFFFF,356,123
15480920,2023-07-21 16:54:47.43 UTC,2T5WDv5Q53gAphdu+EcpR5bZ5TcsrNyzCdT7Xd3liTp7Gh...,355121,#FFFFFF,355,121
15480921,2023-07-21 16:54:47.445 UTC,uoP0ZNA+FnKXEhCLcTz53AjVvDWyV7w5x3Guod8Iy7H4BW...,-126144,#FFFFFF,-126,144
15480922,2023-07-21 16:54:47.449 UTC,chEQO/6dDwueNdD6xvglAsfje5/CF6lW1eka0f8QH7Cu81...,-107492,#FFFFFF,-107,492


In [29]:
result_df['coordinate'].nunique()

1175531