In [27]:
import pandas as pd
import numpy as np

In [26]:
from fastparquet import ParquetFile
import s3fs
import boto3
from fastparquet.cencoding import from_buffer

In [28]:
import asyncio
import aiohttp

In [4]:
pip install sqlitedict

Note: you may need to restart the kernel to use updated packages.


In [4]:
from collections import defaultdict
from sqlitedict import SqliteDict
import zlib

In [5]:
# setting aws s3 client with boto3 inplicitly

import boto3
import botocore
s3client = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED))

In [29]:
# setting aws s3 client with boto3 explicitly

client = boto3.client(
    's3',
    region_name = 'sa-east-1'
)

In [5]:
import pyarrow.dataset as ds

cc_index_s3_path = 's3://commoncrawl/cc-index/table/cc-main/warc/'
cc_index = ds.dataset(cc_index_s3_path, format='parquet', partitioning= 'hive')

In [6]:
fragments = list(
    cc_index.get_fragments(
        filter=(ds.field('crawl')=="CC-MAIN-2022-05") &
        (ds.field('subset')=='warc')
    )
)

len(fragments)

300

In [7]:
%%time
fragments[0].row_groups[0].statistics

CPU times: user 13.4 ms, sys: 1.93 ms, total: 15.4 ms
Wall time: 2.93 s


{'url_surtkey': {'min': 'com,wordpress,freefall852)/2016/03/29/billy-guy',
  'max': 'com,worldpackers)/search/skill_hospitality_entertainment/type_hotel?location_categories[]=nature&location_types[]=hotel&min_meals_count[]=3&months[]=11&skills[]=music'},
 'url': {'min': 'http://03.worldchefsbible.com/',
  'max': 'https://zh.worldallianceofdramatherapy.com/he-mission'},
 'url_host_name': {'min': '03.worldchefsbible.com',
  'max': 'zr1.worldblast.com'},
 'url_host_tld': {'min': 'com', 'max': 'com'},
 'url_host_2nd_last_part': {'min': 'wordpress', 'max': 'worldpackers'},
 'url_host_3rd_last_part': {'min': '03', 'max': 'zr1'},
 'url_host_4th_last_part': {'min': 'bbbfoundation', 'max': 'www'},
 'url_host_5th_last_part': {'min': 'http', 'max': 'toolbox'},
 'url_host_registry_suffix': {'min': 'com', 'max': 'com'},
 'url_host_registered_domain': {'min': 'wordpress.com',
  'max': 'worldpackers.com'},
 'url_host_private_suffix': {'min': 'com', 'max': 'com'},
 'url_host_private_domain': {'min': '

In [9]:
%%time 
fs = s3fs.S3FileSystem()

CPU times: user 18 µs, sys: 10 µs, total: 28 µs
Wall time: 29.3 µs


In [13]:
# setting functions to try to read fragments as parquet file

import time

def perform_s3_operation(fs):
    return  ParquetFile(fn = fragments[0].path, fs=fs)
    
def retry_with_exponential_backoff(fs, max_retries=5):
    retries = 0
    while retries < max_retries:
        try:
            pf = perform_s3_operation(fs)
            return pf
            
        except Exception as e:
            print(f"Error: {e}")
            retries += 1
            wait_time = (2 ** retries)  # Exponential backoff
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
    
    else:
        print("Max retries reached. Exiting.")
    


In [14]:
# trying to read fragments as parquet file

pf = retry_with_exponential_backoff(fs)

# Verificar se pf é None antes de acessar seus atributos
if pf is not None:
    metadata = pf.fmd.row_groups[0].columns[0].meta_data._asdict()
    print(metadata)
else:
    print("ParquetFile object is None.")

Error: [Errno 16] Please reduce your request rate.
Retrying in 2 seconds...
{'type': 6, 'encodings': [0, 4], 'path_in_schema': ['url_surtkey'], 'codec': 2, 'num_values': 1730100, 'total_uncompressed_size': 117917394, 'total_compressed_size': 23113472, 'key_value_metadata': None, 'data_page_offset': 4, 'index_page_offset': None, 'dictionary_page_offset': None, 'statistics': {'max': None, 'min': None, 'null_count': 0, 'distinct_count': None, 'max_value': "b'com,worldpackers)/search/skill_hospitality_entertainment/type_hotel?location_categories[]=nature&location_types[]=hotel&min_meals_count[]=3&months[]=11&skills[]=music'", 'min_value': "b'com,wordpress,freefall852)/2016/03/29/billy-guy'"}, 'encoding_stats': [{'page_type': 0, 'encoding': 0, 'count': 122}], 'bloom_filter_offset': None}


In [15]:
pf.fmd.row_groups[0].columns[0].meta_data._asdict()

{'type': 6,
 'encodings': [0, 4],
 'path_in_schema': ['url_surtkey'],
 'codec': 2,
 'num_values': 1730100,
 'total_uncompressed_size': 117917394,
 'total_compressed_size': 23113472,
 'key_value_metadata': None,
 'data_page_offset': 4,
 'index_page_offset': None,
 'dictionary_page_offset': None,
 'statistics': {'max': None,
  'min': None,
  'null_count': 0,
  'distinct_count': None,
  'max_value': "b'com,worldpackers)/search/skill_hospitality_entertainment/type_hotel?location_categories[]=nature&location_types[]=hotel&min_meals_count[]=3&months[]=11&skills[]=music'",
  'min_value': "b'com,wordpress,freefall852)/2016/03/29/billy-guy'"},
 'encoding_stats': [{'page_type': 0, 'encoding': 0, 'count': 122}],
 'bloom_filter_offset': None}

In [16]:
print(fragments[0].path)

commoncrawl/cc-index/table/cc-main/warc/crawl=CC-MAIN-2022-05/subset=warc/part-00000-1e2959d8-5649-433a-b76e-f1b876a6479d.c000.gz.parquet


In [20]:
file = fragments[0].path
bucket, key =  file.split('/', 1)
metadata = client.head_object(Bucket= bucket, Key = key)
content_length = int(metadata['ContentLength'])
f'{content_length / (1024**3):0.1f} GB'

'1.3 GB'

In [24]:
import time

max_retries = 5
retries = 0
end_byte = content_length
start_byte = end_byte - 8

while retries < max_retries:
    try:
        # S3 request 
        response = client.get_object(Bucket=bucket, Key=key, Range=f'bytes={start_byte}-{end_byte}')
        end_content = response['Body'].read()
        assert end_content[-4:] == b'PAR1'
        file_meta_length = int.from_bytes(end_content[:4], byteorder='little')
        print(f'{file_meta_length / 1024:0.1f} kb')
        break  
    except ClientError as e:
        if e.response['Error']['Code'] == 'SlowDown':
            # Exponential backoff: wait 2^retries seconds before the next attempt
            wait_time = 2 ** retries
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
            retries += 1
        else:
            raise

57.4 kb


In [None]:
local_file_path = '/path/to/your/dataset.parquet'

# Replace this with the desired key (object name) in the S3 bucket
s3_object_key = 'your/dataset.parquet'

# Create an S3 client
s3_client = boto3.client(
    's3',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
)

# Upload the file to S3
s3_client.upload_file(local_file_path, bucket_name, s3_object_key)

print(f"File uploaded to S3 bucket: {bucket_name}/{s3_object_key}")


In [53]:
import pyarrow.parquet as pq


bucket_name = 'actum-dremio-test'
s3_object_key = "cc_parquet_files_1"


In [63]:
pq.write_to_dataset(table=cc_index,
                   root_path = f"s3://{bucket_name}/{s3_object_key}",
                   filesystem=fs)

OSError: AWS Error INTERNAL_FAILURE during GetObject operation: Failed to flush response stream (eof: 0, bad: 1)

In [48]:
print(aws)

OSError: AWS Error INTERNAL_FAILURE during GetObject operation: Failed to flush response stream (eof: 0, bad: 1)

In [50]:
print(cc_index)

<pyarrow._dataset.FileSystemDataset object at 0x7f27f663d5a0>


In [51]:
print(fs)

<s3fs.core.S3FileSystem object at 0x7f27f5d0ede0>


In [62]:
s3client.

{'ResponseMetadata': {'RequestId': 'Q3KY4DCPF75DNEJY',
  'HostId': 'rveADKTzAlC4910S8rW/03VI8HjLfhLRwAy7FYDpgvz9ld6Lohysa0WtCmtuAbQjeVgnC9buLNw=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'rveADKTzAlC4910S8rW/03VI8HjLfhLRwAy7FYDpgvz9ld6Lohysa0WtCmtuAbQjeVgnC9buLNw=',
   'x-amz-request-id': 'Q3KY4DCPF75DNEJY',
   'date': 'Tue, 31 Oct 2023 18:21:30 GMT',
   'content-type': 'application/xml',
   'transfer-encoding': 'chunked',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Buckets': [{'Name': 'actum-dremio-test',
   'CreationDate': datetime.datetime(2023, 8, 18, 1, 4, 40, tzinfo=tzutc())}],
 'Owner': {'DisplayName': 'thiago',
  'ID': 'c244ee0ee4d35733ba20432c77f2c04bf060f4ae67ad19b86e8d6cd402f337d2'}}

## simulating IoT data and populating dataset

### Json dataset

In [2]:
from faker import Faker 
import json
import random
import pandas as pd


In [86]:

num_records = 100000

iot_data = []

for _ in range(num_records):
    sensor_data = {
        "timestamp" : fake.date_time_this_decade(),
        "sensor_id" : fake.uuid4(),
        "temperature" : round(random.uniform(10.0, 40.0), 2),
        "humidity" : round(random.uniform(30.0, 70.0), 2),
        "pressure" : round(random.uniform(900.0, 1100.0), 2)}
    iot_data.append(sensor_data)
df = pd.DataFrame(iot_data)

json_file_path = 'iot_data.json'
df.to_json(json_file_path, orient='records', lines=True, index = False)

print(f"JSON data saved to {json_file_path}")



JSON data saved to iot_data.json
CPU times: user 1.62 s, sys: 23.3 ms, total: 1.64 s
Wall time: 1.65 s


In [88]:
%%time
df_j = pd.read_json("iot_data.json", lines=True)

CPU times: user 111 ms, sys: 3.09 ms, total: 114 ms
Wall time: 113 ms


In [84]:
df_j

Unnamed: 0,timestamp,sensor_id,temperature,humidity,pressure
0,2022-01-18 01:05:22,9eb4961a-96a8-43e1-9209-943bb188ac29,17.82,44.12,965.55
1,2021-02-25 00:27:57,acdfa8af-01ad-41b9-8eb8-311a5321e0fc,11.53,66.15,1041.53
2,2021-03-20 00:18:52,63ff7b38-6a0c-42d9-8ce4-b77fadb52591,14.92,63.45,910.51
3,2020-05-27 10:13:43,ca7a5b19-5c4f-4db9-93a5-fa42353b866b,19.31,69.02,951.79
4,2022-09-01 09:30:40,6374128b-307e-45bf-8d9a-a17b9a7f66b1,39.67,30.98,1067.43
...,...,...,...,...,...
99995,2023-02-07 19:58:51,1e62f3f3-79f5-4013-9d57-607074eafc89,15.33,57.39,977.76
99996,2021-02-02 23:51:12,23196907-5da5-431c-8991-ca16792c6841,32.16,51.95,1098.00
99997,2023-03-28 17:46:30,0ef4d2d3-1c9a-437e-8640-05b5deae50b8,39.93,67.69,912.81
99998,2020-10-13 20:27:54,690f3df0-20db-435d-b1f7-942900d39a2d,11.82,61.61,970.78


In [91]:
%%time
# opening json archives without pandas intermediation

import pyarrow.json as pj

# Read JSON data into a PyArrow Table
json_file_path = 'iot_data.json'
table = pj.read_json(json_file_path)

# Create a PyArrow dataset
dataset = ds.dataset(table)

CPU times: user 90.8 ms, sys: 20.9 ms, total: 112 ms
Wall time: 24.4 ms


In [96]:
table = dataset.to_table()
print(table)

pyarrow.Table
timestamp: int64
sensor_id: string
temperature: double
humidity: double
pressure: double
----
timestamp: [[1659743331000,1697612384000,1598944995000,1643899053000,1676196307000,...,1687278198000,1611804244000,1649632217000,1602693578000,1642134835000],[1632432676000,1642973198000,1675774623000,1688439565000,1598947464000,...,1672630884000,1606694082000,1682485423000,1654479183000,1638210124000],...,[1698350893000,1615559180000,1661369318000,1598304024000,1611933890000,...,1652400479000,1683826845000,1698738956000,1581918698000,1661386617000],[1616608858000,1694876950000,1653201859000,1680258812000,1590749735000,...,1613695189000,1670505827000,1639539244000,1676449453000,1695228825000]]
sensor_id: [["f29dc7bd-53e5-4990-9212-40b5ad1f5ace","7de362dd-ce84-446f-86bf-0cda9b3ad855","2908fb58-f981-4c53-920f-34d0d4c21811","57e183c2-ae91-4add-a5a0-0f6bf7d11e8f","52ece0d4-3578-4a16-a352-6d5f6f6bbfc7",...,"33b2562e-6f27-41ed-ba8b-665e39ec079b","56542ffd-3895-4e57-b70e-5aa9b73a5db7","

In [120]:
%%time
# opening json archives with pandas intermediation


import pyarrow as pa
import pyarrow.dataset as ds
import pandas as pd
%%time
# Read JSON data into a Pandas DataFrame
json_file_path = 'iot_data.json'
df = pd.read_json(json_file_path, lines=True)

# Convert Pandas DataFrame to PyArrow Table
table = pa.Table.from_pandas(df)

# Create a PyArrow dataset
dataset = ds.dataset(table)


UsageError: Line magic function `%%time` not found.


In [116]:
print(dataset)

<pyarrow._dataset.InMemoryDataset object at 0x7f24dd11f6a0>


In [121]:
arrow_file_path = ''

# Write the Table to the Arrow file
with pa.OSFile(arrow_file_path, 'wb') as f:
    with pa.RecordBatchFileWriter(f, table.schema) as writer:
        writer.write_table(table)

In [122]:
for i, iteration_table in enumerate(iterations):
    # Specify the file path for each iteration
    arrow_file_path = f'iteration_{i}.arrow'

    # Write the Table to the Arrow file
    with pa.OSFile(arrow_file_path, 'wb') as f:
        with pa.RecordBatchFileWriter(f, iteration_table.schema) as writer:
            writer.write_table(iteration_table)

    # Upload the Arrow file to S3
    s3_object_key = f'{s3_object_prefix}iteration_{i}.arrow'
    try:
        s3_client = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key)
        s3_client.upload_file(arrow_file_path, bucket_name, s3_object_key)
        print(f'Successfully uploaded iteration {i} to S3: {s3_object_key}')
    except NoCredentialsError:
        print('Credentials not available')


NameError: name 'iterations' is not defined

In [11]:
import pyarrow as pa
import pyarrow.dataset as ds
import pandas as pd
from datetime import datetime
import boto3
import s3fs
import os
from faker import Faker

In [4]:
fake = Faker()

In [5]:
client = boto3.client('s3',
    region_name = 'sa-east-1')

In [6]:
bucket_name = 'actum-dremio-test'
pyarrow_files = "pyarrow_files"


In [None]:
%%time

num_records = 30653

for i in range(num_records):
    # Generate your IoT data for each iteration
    sensor_data = {
        "timestamp" : fake.date_time_this_decade(),
        "sensor_id" : fake.uuid4(),
        "temperature" : round(random.uniform(10.0, 40.0), 2),
        "humidity" : round(random.uniform(30.0, 70.0), 2),
        "pressure" : round(random.uniform(900.0, 1100.0), 2)
    }


    
    # Convert the record to a PyArrow Table
    table = pa.Table.from_pandas(pd.DataFrame([sensor_data]))

    s3_object_prefix = fake.coordinate()
    
    # Specify the S3 object key for each iteration
    s3_object_key = f'{s3_object_prefix}_iteration_{i}.arrow'

    pyarrow_file_c = os.path.join(pyarrow_files, os.path.basename(s3_object_key))

    # Upload the PyArrow Table to S3
    try:
        with pa.BufferOutputStream() as stream:
            with pa.RecordBatchFileWriter(stream, table.schema) as writer:
                writer.write_table(table)

            client.put_object(Body=stream.getvalue().to_pybytes(), Bucket=bucket_name, Key=pyarrow_file_c)
            print(f'Successfully uploaded iteration {i} to S3: {s3_object_key}')
    except Exception as e:
        print('Credentials not available')

Successfully uploaded iteration 0 to S3: 95.023589_iteration_0.arrow
Successfully uploaded iteration 1 to S3: -104.857280_iteration_1.arrow
Successfully uploaded iteration 2 to S3: -156.254695_iteration_2.arrow
Successfully uploaded iteration 3 to S3: 21.763416_iteration_3.arrow
Successfully uploaded iteration 4 to S3: -81.918108_iteration_4.arrow
Successfully uploaded iteration 5 to S3: 176.593544_iteration_5.arrow
Successfully uploaded iteration 6 to S3: -153.452677_iteration_6.arrow
Successfully uploaded iteration 7 to S3: 116.513618_iteration_7.arrow
Successfully uploaded iteration 8 to S3: 80.497967_iteration_8.arrow
Successfully uploaded iteration 9 to S3: -137.060429_iteration_9.arrow
Successfully uploaded iteration 10 to S3: 93.962801_iteration_10.arrow
Successfully uploaded iteration 11 to S3: -13.731813_iteration_11.arrow
Successfully uploaded iteration 12 to S3: 8.254772_iteration_12.arrow
Successfully uploaded iteration 13 to S3: -102.066897_iteration_13.arrow
Successfully 

In [None]:
from datetime import datetime

In [None]:
agora = datetime.now()
print("Tempo atual:", agora)

## Renaming access keys to fix Parameter validation error 

In [10]:
!aws s3 mv s3://actum-dremio-test/json_files// s3://actum-dremio-test/json_files/files --recursive

In [None]:
# renaming

response = client.list_objects_v2(Bucket='actum-dremio-test/json_files//')

object_count = response['KeyCount']
print(f"Number of objects in the bucket: {object_count}")

In [22]:
with open("v1.metadata.json", "r") as f:
    s = f.read()
    print(s)

{
  "format-version" : 1,
  "table-uuid" : "cdd1cf43-e463-4ff1-94ab-fd1e16b000b6",
  "location" : "s3://actum-dremio-test2/testing4j",
  "last-updated-ms" : 1699328925194,
  "last-column-id" : 4,
  "schema" : {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "humidity",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 2,
      "name" : "pressure",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 3,
      "name" : "sensor_id",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 4,
      "name" : "temperature",
      "required" : false,
      "type" : "double"
    } ]
  },
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "humidity",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 2,
      "name" : "pressure",
      "required" : false,
      "type" : "double"
    }, 

In [25]:
df = pd.read_json("v1.metadata.json")
import pandas as pd

# Your JSON data
json_data = {s}

# Extract the "fields" array from the "schema" dictionary
schema_fields = json_data["schema"]["fields"]

# Create an empty DataFrame
df = pd.DataFrame()

# Populate the DataFrame with the fields from "schema" and "schemas"
for schema in json_data["schemas"]:
    schema_fields += schema["fields"]

    # Extract the field names and types
    field_names = [field["name"] for field in schema["fields"]]
    field_types = [field["type"] for field in schema["fields"]]

    # Initialize an empty DataFrame with the extracted field names
    schema_df = pd.DataFrame(columns=field_names)

    # Convert the field types to Pandas types (you may need to adjust this based on your actual data)
    type_mapping = {
        "string": str,
        "double": float,
        # Add more types as needed
    }
    schema_df = schema_df.astype({name: type_mapping[type_] for name, type_ in zip(field_names, field_types)})

    # Append the empty DataFrame to the main DataFrame
    df = pd.concat([df, schema_df])

print(df)

ValueError: All arrays must be of the same length

In [15]:
df

Unnamed: 0,timestamp,sensor_id,temperature,humidity,pressure
0,2022-12-12 04:13:40,b21c8450-a4bd-40a8-be34-c5f48b7870b8,26.79,64.26,1004.13
1,2021-06-04 02:03:18,76b70335-2c70-4c13-8f49-6e703ec8b34e,13.55,54.51,1030.19
2,2022-11-30 12:06:23,bb244628-9883-4bde-a9c8-cd218798579f,29.12,53.62,1098.47
3,2023-03-28 01:13:30,2bde00cf-f9a3-4211-b5ed-5315b831662b,10.15,42.32,1051.74


In [39]:
import json
import fastparquet

json_data = {
    "day": "Monday",
    "month": "January",
    "year": 2023,
    "tenantId": "your_tenant_id",
    "eventType": "sample_event",
    "eventId": "sample_event_id",
    "data": {
        "event": {
            "eventId": "catuml8t10jfbd5u8apg",
            "targetName": "projects/c8caej0697o000f6gmm0/devices/c095a816895g00821adg",
            "eventType": "humidity",
            "data": {
                "humidity": {
                    "temperature": 25.9,
                    "relativeHumidity": 49,
                    "updateTime": "2022-06-23T14:37:59.935000Z"
                }
            },
            "timestamp": "2022-06-23T14:37:59.935000Z"
        },
        "metadata": {
            "deviceId": "095a816895g00821adg",
            "projectId": "c8caej0697o000f6gmm0",
            "deviceType": "humidity",
            "productNumber": "102081"
        }
    }
}

parquet_file_path = "example_fastparquet.parquet"

fastparquet.write(parquet_file_path, json_data)

client.put_object(Body=stream.getvalue().to_pybytes(), Bucket=bucket_name, Key=pyarrow_file_c)
client.upload_file(arrow_file_path, bucket_name, s3_object_key)

AttributeError: 'dict' object has no attribute 'columns'

In [41]:
%%time

import pandas as pd
import fastparquet

# Your JSON data
json_data = {
    "day": ["Monday"],
    "month": ["January"],
    "year": [2023],
    "tenantId": ["your_tenant_id"],
    "eventType": ["sample_event"],
    "eventId": ["sample_event_id"],
    "data": [
        {
            "event": {
                "eventId": "catuml8t10jfbd5u8apg",
                "targetName": "projects/c8caej0697o000f6gmm0/devices/c095a816895g00821adg",
                "eventType": "humidity",
                "data": {
                    "humidity": {
                        "temperature": 25.9,
                        "relativeHumidity": 49,
                        "updateTime": "2022-06-23T14:37:59.935000Z"
                    }
                },
                "timestamp": "2022-06-23T14:37:59.935000Z"
            },
            "metadata": {
                "deviceId": "095a816895g00821adg",
                "projectId": "c8caej0697o000f6gmm0",
                "deviceType": "humidity",
                "productNumber": "102081"
            }
        }
    ]
}

# Convert the JSON data to a Pandas DataFrame
df = pd.json_normalize(json_data)

# Specify the Parquet file path
parquet_file_path = "example_fastparquet.parquet"

# Write the Pandas DataFrame to a Parquet file using fastparquet
fastparquet.write(parquet_file_path, df)


CPU times: user 5.48 ms, sys: 0 ns, total: 5.48 ms
Wall time: 5.96 ms


In [36]:
datetime.now()

datetime.datetime(2023, 11, 8, 1, 25, 19, 6084)

In [42]:
df

Unnamed: 0,day,month,year,tenantId,eventType,eventId,data
0,[Monday],[January],[2023],[your_tenant_id],[sample_event],[sample_event_id],"[{'event': {'eventId': 'catuml8t10jfbd5u8apg',..."
