In [1]:
! pip install faker



In [8]:
import asyncio
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import random
import string
from faker import Faker
from datetime import datetime
import io
import nest_asyncio
from google.cloud import storage
from google.oauth2 import service_account
from concurrent.futures import ThreadPoolExecutor

# Helper function to generate random IDs
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(size))

# Generate project IDs
projects = [f"{id_generator(4)}" for _ in range(4)]

# Generate tenant IDs
Faker.seed(100)
fake = Faker()
tenants = [id_generator(4) for _ in range(10)]

events = ["unfreeze", "depressurize", "warm", "", "pressurize", "normalize"]

# GCP configuration
gcp_bucket_name = "myfirstproject-inputbucket"
gcp_credentials_path = "double-balm-438113-f5-53067ace2784.json"

nest_asyncio.apply()

# Define schema (same as before)
humidity_schema = pa.struct([
    ('temperature', pa.float64()),
    ('relativeHumidity', pa.float64()),
    ('updateTime', pa.timestamp('ms'))
])

event_data_schema = pa.struct([
    ('humidity', humidity_schema)
])

event_schema = pa.struct([
    ('eventId', pa.string()),
    ('targetName', pa.string()),
    ('eventType', pa.string()),
    ('data', event_data_schema),
    ('timestamp', pa.timestamp('ms'))
])

metadata_schema = pa.struct([
    ('deviceId', pa.string()),
    ('projectId', pa.string()),
    ('deviceType', pa.string()),
    ('productNumber', pa.string())
])

data_schema = pa.struct([
    ('event', event_schema),
    ('metadata', metadata_schema)
])

schema = pa.schema([
    ('day', pa.int64()),
    ('month', pa.int64()),
    ('year', pa.int64()),
    ('tenantId', pa.string()),
    ('eventType', pa.string()),
    ('eventId', pa.string()),
    ('data', data_schema)
])

def upload_to_gcs(bucket, blob_name, data):
    blob = bucket.blob(blob_name)
    blob.upload_from_string(data)
    print(f'Successfully uploaded file to GCS: {blob_name}')

async def generate_and_upload_data(i, bucket, executor):
    # Generate sample data (same as before)
    day = fake.day_of_month()
    month = fake.month()
    year = fake.year()
    tenantId = random.choice(tenants)
    project = random.choice(projects)
    eventType = random.choice(events)
    eventId = f"{tenantId}_{id_generator(4)}"
    update_time = fake.date_time_this_year()
    
    sample_json_data = [{
        "day": day,
        "month": month,
        "year": year,
        "tenantId": tenantId,
        "eventType": eventType,
        "eventId": eventId,
        "data": {
            "event": {
                "eventId": eventId,
                "targetName": f"projects/{project}/devices/{eventId}adg",
                "eventType": eventType,
                "data": {
                    "humidity": {
                        "temperature": round(random.uniform(10.0, 40.0), 2),
                        "relativeHumidity": round(random.uniform(30.0, 70.0), 2),
                        "updateTime": update_time
                    }
                },
                "timestamp": update_time
            },
            "metadata": {
                "deviceId": f"{eventId}adg",
                "projectId": project,
                "deviceType": eventType,
                "productNumber": "102081"
            }
        }
    }]
    
    df = pd.DataFrame(sample_json_data)
    df['day'] = df['day'].astype('int64')
    df['month'] = df['month'].astype('int64')
    df['year'] = df['year'].astype('int64')

    table = pa.Table.from_pandas(df, schema=schema)
    
    try:
        with io.BytesIO() as f:
            pq.write_table(table, f, coerce_timestamps='ms')
            f.seek(0)
            blob_name = f"{eventId}_.parquet"
            # Use ThreadPoolExecutor to run the upload in a separate thread
            await asyncio.get_event_loop().run_in_executor(
                executor, upload_to_gcs, bucket, blob_name, f.getvalue())
    except Exception as e:
        print(f'Error on iteration {i}: {e}')

async def main():
    n_rows = 3  # Number of rows
    
    # Set up GCP credentials and client
    credentials = service_account.Credentials.from_service_account_file(gcp_credentials_path)
    storage_client = storage.Client(credentials=credentials)
    bucket = storage_client.bucket(gcp_bucket_name)
    
    # Create a ThreadPoolExecutor for running GCS uploads
    with ThreadPoolExecutor(max_workers=10) as executor:
        tasks = [asyncio.ensure_future(generate_and_upload_data(i, bucket, executor)) for i in range(n_rows)]
        await asyncio.gather(*tasks)

# Run the main coroutine
if __name__ == "__main__":
    before = datetime.now()
    print("Start time:", before)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    after = datetime.now()
    print("End time:", after)
    print("Elapsed time:", after - before)

Start time: 2024-10-18 08:28:42.700483
Successfully uploaded file to GCS: yk53_av8o_.parquet
Successfully uploaded file to GCS: i1qp_3mdc_.parquet
Successfully uploaded file to GCS: 8j9f_c4hd_.parquet
End time: 2024-10-18 08:28:43.941967
Elapsed time: 0:00:01.241484
