In [None]:
# pip install fsspec gcsfs pandas pyarrow

In [2]:
import pandas as pd
import fsspec
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
import os

def tree(fs, path, prefix=""):
    """Recursively list the contents of a directory in a tree-like format."""
    # print(path)
    items = fs.ls(path, detail=True)
    for i, item in enumerate(items):
        is_last = i == (len(items) - 1)  # Check if this is the last item
        if item['type'] == 'directory':
            # Print the directory name
            print(f"{prefix}{'└── ' if is_last else '├── '}{item['name'].split('/')[-1]}")
            # Recursively list this directory's contents
            new_prefix = prefix + ('    ' if is_last else '│   ')
            tree(fs, item['name'], new_prefix)
        else:
            # Print the file name
            print(f"{prefix}{'└── ' if is_last else '├── '}{item['name'].split('/')[-1]}")


### Local File System

In [3]:
# fs = fsspec.filesystem('file',auto_mkdir=True) #fsspec.filesystem(catalog_path, auto_mkdir=False)

# catalog_path='./catalog/'
# with fs.open(catalog_path+'readme.md','wb') as f:
#     f.write(b'# Hello')
    
# tree(fs,catalog_path)

### Google Cloud Storage(GCS)

In [4]:
gcs_token_path = os.path.join('../_env/teacher-dsi310-2023.json')
catalog_path = 'gcs://dsi310_bucket/'
fs=fsspec.filesystem('gcs', token=gcs_token_path)
with fs.open(catalog_path+'readme.md','wb') as f:
    f.write(b'# Hello')
tree(fs,catalog_path)

├── 6424650049
│   ├── date=2023-01-01
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   ├── date=2023-01-02
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   ├── date=2023-01-03
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   ├── date=2023-01-04
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   └── date=2023-01-05
│       └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
├── 6424650254
│   ├── date=2023-01-01
│   │   ├── 3e807bb00f524531a24597b996b77e37-0.parquet
│   │   ├── a224e7c66d0d44df96fc04ddb6bf1ab7-0.parquet
│   │   ├── b181ff657b294ec6ac84baed7496eb53-0.parquet
│   │   └── e3369d2195f94d7ca0ad6f1a5f8c5a53-0.parquet
│   ├── date=2023-01-02
│   │   ├── 3e807bb00f524531a24597b996b77e37-0.parquet
│   │   ├── a224e7c66d0d44df96fc04ddb6bf1ab7-0.parquet
│   │   ├── b181ff657b294ec6ac84baed7496eb53-0.parquet
│   │   └── e3369d2195f94d7ca0ad6f1a5f8c5a53-0.parquet
│   ├── date=2023-01-03
│   │   ├── 3e807bb00f524531a24597b996b77e37-0.parquet
│   │   

In [5]:
# Example DataFrame
df = pd.DataFrame({
    'date': pd.date_range(start='2023-01-01', periods=5, freq='D'),
    'product_id': range(1, 6),
    'quantity': [5, 3, 6, 2, 7],
    'price': [20.5, 10.0, 15.5, 25.0, 30.0]
})

# Metadata and Data Dictionary
metadata = {'source': 'Sales System', 'creation_date': datetime.now().isoformat()}
data_dictionary = {
    'date': 'Transaction date',
    'product_id': 'Product identifier',
    'quantity': 'Quantity sold',
    'price': 'Sale price'
}

# Convert DataFrame to PyArrow Table with metadata
table = pa.Table.from_pandas(df)
table = table.replace_schema_metadata({'metadata': str(metadata), 'dictionary': str(data_dictionary)})

table.schema

date: timestamp[ns]
product_id: int64
quantity: int64
price: double
-- schema metadata --
metadata: '{'source': 'Sales System', 'creation_date': '2023-11-23T13:51:' + 11
dictionary: '{'date': 'Transaction date', 'product_id': 'Product identifi' + 56

In [6]:
# Extracting metadata and data dictionary
metadata = eval(table.schema.metadata[b'metadata']) if b'metadata' in table.schema.metadata else None
data_dictionary = eval(table.schema.metadata[b'dictionary']) if b'dictionary' in table.schema.metadata else None

# Display the DataFrame, Metadata, and Data Dictionary
print("Metadata:", metadata)
print("Data Dictionary:", data_dictionary)

Metadata: {'source': 'Sales System', 'creation_date': '2023-11-23T13:51:56.088774'}
Data Dictionary: {'date': 'Transaction date', 'product_id': 'Product identifier', 'quantity': 'Quantity sold', 'price': 'Sale price'}


In [7]:
# Write to GCS
# gcs_path = 'gcs://dsi310_bucket/sales_data.parquet'  # Replace with your bucket path
with fs.open(path=catalog_path+'sale.parquet',mode='wb') as f:  # Replace with your GCS token
    pq.write_table(table, f)

tree(fs,catalog_path)

├── 6424650049
│   ├── date=2023-01-01
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   ├── date=2023-01-02
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   ├── date=2023-01-03
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   ├── date=2023-01-04
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   └── date=2023-01-05
│       └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
├── 6424650254
│   ├── date=2023-01-01
│   │   ├── 3e807bb00f524531a24597b996b77e37-0.parquet
│   │   ├── a224e7c66d0d44df96fc04ddb6bf1ab7-0.parquet
│   │   ├── b181ff657b294ec6ac84baed7496eb53-0.parquet
│   │   └── e3369d2195f94d7ca0ad6f1a5f8c5a53-0.parquet
│   ├── date=2023-01-02
│   │   ├── 3e807bb00f524531a24597b996b77e37-0.parquet
│   │   ├── a224e7c66d0d44df96fc04ddb6bf1ab7-0.parquet
│   │   ├── b181ff657b294ec6ac84baed7496eb53-0.parquet
│   │   └── e3369d2195f94d7ca0ad6f1a5f8c5a53-0.parquet
│   ├── date=2023-01-03
│   │   ├── 3e807bb00f524531a24597b996b77e37-0.parquet
│   │   

In [8]:
import pandas as pd
import fsspec
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime

# Example DataFrame
df = pd.DataFrame({
    'date': pd.date_range(start='2023-01-01', periods=5, freq='D'),
    'product_id': range(1, 6),
    'quantity': [5, 3, 6, 2, 7],
    'price': [20.5, 10.0, 15.5, 25.0, 30.0]
})
df['date'] = df['date'].dt.date
# Metadata and Data Dictionary
metadata = {'source': 'Sales System', 'creation_date': datetime.now().isoformat()}
data_dictionary = {
    'date': 'Transaction date',
    'product_id': 'Product identifier',
    'quantity': 'Quantity sold',
    'price': 'Sale price'
}

# Convert DataFrame to PyArrow Table with metadata
table = pa.Table.from_pandas(df)
table = table.replace_schema_metadata({'metadata': str(metadata), 'dictionary': str(data_dictionary)})

df

Unnamed: 0,date,product_id,quantity,price
0,2023-01-01,1,5,20.5
1,2023-01-02,2,3,10.0
2,2023-01-03,3,6,15.5
3,2023-01-04,4,2,25.0
4,2023-01-05,5,7,30.0


In [9]:
# Define GCS path and write to GCS

# No need to open a file with fsspec, use the path directly
dataset_name ='sale'
path = catalog_path+dataset_name
path

'gcs://dsi310_bucket/sale'

In [10]:
pq.write_to_dataset(table, root_path=path, partition_cols=['date'], filesystem=fs,)

tree(fs,catalog_path)

├── 6424650049
│   ├── date=2023-01-01
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   ├── date=2023-01-02
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   ├── date=2023-01-03
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   ├── date=2023-01-04
│   │   └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
│   └── date=2023-01-05
│       └── 1ad6611902bb4b2b9e7eb74ebbaaed38-0.parquet
├── 6424650254
│   ├── date=2023-01-01
│   │   ├── 3e807bb00f524531a24597b996b77e37-0.parquet
│   │   ├── a224e7c66d0d44df96fc04ddb6bf1ab7-0.parquet
│   │   ├── b181ff657b294ec6ac84baed7496eb53-0.parquet
│   │   └── e3369d2195f94d7ca0ad6f1a5f8c5a53-0.parquet
│   ├── date=2023-01-02
│   │   ├── 3e807bb00f524531a24597b996b77e37-0.parquet
│   │   ├── a224e7c66d0d44df96fc04ddb6bf1ab7-0.parquet
│   │   ├── b181ff657b294ec6ac84baed7496eb53-0.parquet
│   │   └── e3369d2195f94d7ca0ad6f1a5f8c5a53-0.parquet
│   ├── date=2023-01-03
│   │   ├── 3e807bb00f524531a24597b996b77e37-0.parquet
│   │   

In [12]:
catalog_path+'sale'


'gcs://dsi310_bucket/sale'

In [13]:

import pyarrow.dataset as ds
# dataset = ds.dataset('/dsi310_bucket/sale/', format="parquet", filesystem=fs)


dataset = pq.ParquetDataset(path_or_paths=catalog_path+'sale/*/*.parquet', filesystem=fs)
table = dataset.read()

table.schema

FileNotFoundError: gcs://dsi310_bucket/sale/*/*.parquet

In [None]:
# Extracting metadata and data dictionary
metadata = eval(table.schema.metadata[b'metadata']) if b'metadata' in table.schema.metadata else None
data_dictionary = eval(table.schema.metadata[b'dictionary']) if b'dictionary' in table.schema.metadata else None

# Display the DataFrame, Metadata, and Data Dictionary
print("Metadata:", metadata)
print("Data Dictionary:", data_dictionary)

In [None]:
# Convert to Pandas DataFrame
df = table.to_pandas()

# Display the DataFrame
print(df.head())