Note: This script needs to be run on a properly configured Dask cluster. It looks for files on a mounted Samba share at /picluster.

In [1]:
import os, subprocess, re, keyring, nltk
from dask.distributed import Client, get_worker
import dask.bag as db
from nltk.tokenize import word_tokenize

# Define the server details
server_name = '192.168.1.200'
shared_dir = 'TechnicalShare'
working_dir = 'datascience/found-infrastructure'

# Get the username and password from keyring
username = 'cluster_worker'
password = keyring.get_password('cluster', username)

# Create an SMB connection and test it
command = f"smbclient //{server_name}/{shared_dir} -U {username}%{password} -c 'exit'"
result = subprocess.run(command, shell=True, capture_output=True, text=True)

if result.returncode == 0:
    print('Connection to file share successful.\n')
else:
    print('Failed to connect to file share.')

# Traverse to the working directory, print its filepath, and list its contents
command = f"smbclient //{server_name}/{shared_dir} -U {username}%{password} -c 'cd {working_dir}; pwd; ls'"
result = subprocess.run(command, shell=True, capture_output=True, text=True)

if result.returncode == 0:
    print('Traversed to working directory successfully.\n')
    # Split the output into lines
    lines = result.stdout.split('\n')
    # Find and print the line that contains the present working directory
    for line in lines:
        if 'Current directory is' in line:
            print(line)
            break
    print('\nDirectory contents incude:\n')
    # Filter out hidden files and specified files/directories
    exclude_items = ['.', '__pycache__']
    lines = [line.strip() for line in lines if not any(item in line.strip() for item in exclude_items)]
    print('\n'.join(lines))
else:
    print('Failed to traverse to working directory.')


Connection to file share successful.

Traversed to working directory successfully.

Current directory is \\192.168.1.200\TechnicalShare\datascience\found-infrastructure\

Directory contents incude:

distributed_compute                 D        0  Fri Nov  3 19:31:55 2023
assets                              D        0  Thu Oct 19 18:22:58 2023
sources                             D        0  Thu Oct 19 18:22:56 2023
standalone_compute                  D        0  Thu Oct 19 18:22:57 2023
modules                             D        0  Thu Oct 19 19:42:33 2023




In [2]:
# Start parallel computing with Dask. 
# Note that Dask needs the absolute file path to source files, not the network file path used above. 
# The location of the mounted Samba share needs to be the same on each node.

# Connect to Dask distributed cluster
from dask.distributed import Client
client = Client('tcp://192.168.1.200:8786') 

input_files = '../sources/trollope/*.txt'
output_file = 'output/trollope_entities.csv'

# Use Dask's read_text to get the contents of the directory. 
import dask.bag as db
contents = db.read_text(input_files).compute()

# Load your text files
b = db.read_text(input_files)

# Set the mode ('complete' or 'novels')
mode = 'novels'  # Change this to 'novels' if needed

# Define your regular expression for the title markers
title_marker = re.compile(r'\[[A-Z\']+\]')

def process_text(text):
    # Tokenize the text
    tokens = word_tokenize(text)

    # If analyzing the entire corpus, you can remove the title markers
    if mode == 'complete':
        tokens = [token for token in tokens if not title_marker.match(token)]
        return ' '.join(tokens)

    # If analyzing individual novels, you can split the tokens list into sublists
    elif mode == 'novels':
        novels = []
        novel = []
        for token in tokens:
            if title_marker.match(token):
                if novel:  # if the novel list is not empty, add it to novels
                    novels.append(novel)
                    novel = []  # start a new novel
            else:
                novel.append(token)

        if novel:  # add the last novel
            novels.append(novel)

        return '\n'.join([' '.join(novel_tokens) for novel_tokens in novels])

# Apply the function to each text file in parallel
results = b.map(process_text).compute()

# Save results 
with open(output_file, 'w') as output_file:
    output_file.write('\n'.join(results))


+-------------+-----------------+-----------------+--------------------+
| Package     | Client          | Scheduler       | Workers            |
+-------------+-----------------+-----------------+--------------------+
| cloudpickle | 3.0.0           | 3.0.0           | {'2.2.1', '3.0.0'} |
| msgpack     | 1.0.7           | 1.0.7           | {'1.0.5', '1.0.7'} |
| numpy       | 1.26.1          | 1.26.1          | None               |
| pandas      | 2.1.1           | 2.1.1           | None               |
| python      | 3.10.12.final.0 | 3.10.12.final.0 | 3.9.2.final.0      |
| tornado     | 6.3.3           | 6.3.3           | {'6.3.2', '6.3.3'} |
+-------------+-----------------+-----------------+--------------------+


KeyboardInterrupt: 

In [3]:
# Check that the file has been tokenized. 

def is_tokenized(file_path):
    with open(file_path, 'r') as f:
        first_line = f.readline()
        # If the first line contains spaces, it's likely the file has been tokenized
        return ' ' in first_line

print(is_tokenized('/picluster/datascience/trollope-gutenberg/distributed_compute/nltk/nltk-output/tokenized_merged_novels.txt'))

FileNotFoundError: [Errno 2] No such file or directory: '/picluster/datascience/trollope-gutenberg/distributed_compute/nltk/nltk-output/tokenized_merged_novels.txt'