In [None]:
import ray
import rayvens
import sys
import time
import os
import yaml
import json

import pandas as pd
from io import StringIO

In [None]:
import random
import string

def generate_id(N: int=8) -> str:
    
    return ''.join(random.choices(string.ascii_lowercase + string.digits, k=N))

# File System Directory as Source and Sink
___

This tutorial will demo how to read, process, and upload files from and to a ```rayvens``` stream, where the source and sink of the stream are directories in the local file system.
___

## Run Parameters

Let's first define some run parameters for ```rayvens```:

* ```run_mode```: for the rayvens run mode. We will demonstrate on the local run mode.
* ```after_idle_for```: idle time before stream is disconnected.
* ```rayvens_logs```: whether to print the full ```rayvens``` logs. We will opt not to print them.

In [None]:
from collections import namedtuple

run_mode, after_idle_for, rayvens_logs = 'local', 5, False

Args = namedtuple('Args',['run_mode','after_idle_for','rayvens_logs'])
args = Args(run_mode, after_idle_for, rayvens_logs)

## Initialization

We now initialize ```ray``` and ```rayvens```. 

In [None]:
import ray
import rayvens

if args.run_mode == 'operator':
    ray.init(address='auto',ignore_reinit_error=True)
else:
    ray.init(ignore_reinit_error=True)
rayvens.init(mode=args.run_mode ,release=(not args.rayvens_logs))

## Source and Sink Configurations

We choose a file system directory ```path```, which we will use both to read and process files as well as to upload files. 

By default, we choose ```path``` to be a new directory ```rayvens_directory_test``` under the current working directory.

We choose an additional file system directory ```path_dest``` under ```path``` into which files will be trasferred once processed.

NOTE: change these default path values as needed.

In [None]:
path = './rayvens_directory_test' # will be created in current working directory
path_dest = './processed' # will be created under path

if not os.path.exists(path):
    os.mkdir(path)

In [None]:
from pathlib import Path

stream_path = Path(path)

if stream_path.is_dir():
    source_path = str(stream_path)
    sink_path = str(stream_path)
elif stream_path.is_file():
    source_path = str(stream_path)
    filename = str(stream_path.name)
    sink_path = str(stream_path.parent)
else:
    raise TypeError(f'Path {path} must either be a directory or a file.')

We define the configurations for the stream source and sink.

NOTE: we can either choose to keep the files in the source directory with ```keep_file=True``` or move them to a different directory by specifying one under ```move_after_read```. These two options are MUTUALLY EXCLUSIVE.

In [None]:
source_config = dict(kind='file-source', path=source_path, keep_file=False, move_after_read=path_dest)
sink_config = dict(kind='file-sink', path=sink_path)

## rayvens Sink

We are ready to test the given file system directory under ```path``` as a ```rayvens``` stream sink. 

We will use the ```rayvens``` stream to upload a json file to the directory under ```path```.

In [None]:
json_content = {'content': ('foo', None, 1.0, 2)}
json_name = f"file_{generate_id()}.json"
json_path = os.path.join(sink_path,json_name)

In [None]:
stream = rayvens.Stream('files-upload')

sink = stream.add_sink(sink_config)

event = rayvens.OutputEvent(json.dumps(json_content),{"CamelFileName": json_name})

stream << event

stream.disconnect_all(after_idle_for=args.after_idle_for)

In [None]:
print('Upload of file {} successful: {}.'.format(json_name,os.path.exists(json_path)))

## rayvens Source

We will now test the given file system directory under ```path``` as a ```rayvens``` source.

NOTE: Moving will delete all files under ```path```, once processed, and trasfer them to the directory under ```path_dest```.

In [None]:
stream = rayvens.Stream('files-download')

source = stream.add_source(source_config)

def process_file(event):
    event_json = json.loads(event)
    print(f"File name: {event_json['filename']}, Contents: {event_json['body']}")

stream >> process_file

stream.disconnect_all(after_idle_for=args.after_idle_for)

In [None]:
print('Files kept in path:',os.path.exists(json_path))

## rayvens Raw Source

```rayvens``` also has the option to get the raw file data directly, as opoosed to the above event json format.

This option does not have file move functionality. It is intended for scenarios where file name is redundant and file size may be a major concern.

First, we will create a new test file.

In [None]:
json_content_raw = {'content': ('bar', None, 1.0, 2)}
json_name_raw = f"file_{generate_id()}.json"
json_path_raw = os.path.join(sink_path,json_name_raw)

And upload it to the sink path.

In [None]:
sink_config_raw = dict(kind='file-sink', path=sink_path)

stream = rayvens.Stream('files-upload-raw')

sink = stream.add_sink(sink_config_raw)

event = rayvens.OutputEvent(json.dumps(json_content_raw),{"CamelFileName": json_name_raw})

stream << event

stream.disconnect_all(after_idle_for=args.after_idle_for)

The source configuration for raw file data has a slightly different API.

In [None]:
source_config_raw = dict(kind='file-source-raw', path=source_path, keep_file=True)

Now we're ready to process the uploaded file.

In [None]:
stream = rayvens.Stream('files-download')

source = stream.add_source(source_config_raw)

def process_file(event):
    event = json.loads(event)
    print(f"File content: {event}")

stream >> process_file

stream.disconnect_all(after_idle_for=args.after_idle_for)

In [None]:
print('Files kept in path:',os.path.exists(json_path_raw))

## Shutting down

Finally, we make sure to shut down eveything.

In [None]:
stream.disconnect_all()

ray.shutdown()