In [None]:
from typing import BinaryIO, Iterator, Iterable
import pandas as pd
import traceback
import zstandard
import datetime
import requests
import time
import tqdm
import sys
import os

In [9]:
fileOrFolderPath = 'data/posts/r_tattoos_posts_all.jsonl'

# Imgs downloading config:
subreddit = "tattoos"
download_images = False
compress_images = True
compress_quality = 70
download_folder = "../data/downloaded_imgs"

In [10]:
def formatTime(seconds: float) -> str:
	if seconds == 0:
		return "0s"
	if seconds < 0.001:
		return f"{seconds * 1_000_000:.1f}µs"
	if seconds < 1:
		return f"{seconds * 1_000:.2f}ms"
	elapsedHr = int(seconds // 3600)
	elapsedMin = int((seconds % 3600) // 60)
	elapsedSec = int(seconds % 60)
	return f"{elapsedHr:02}:{elapsedMin:02}:{elapsedSec:02}"

In [11]:
class FileProgressLog:
	file: BinaryIO
	fileSize: int
	i: int
	startTime: float
	printEvery: int
	maxLineLength: int

	def __init__(self, path: str, file: BinaryIO):
		self.file = file
		self.fileSize = os.path.getsize(path)
		self.i = 0
		self.startTime = time.time()
		self.printEvery = 10_000
		self.maxLineLength = 0
	
	def onRow(self):
		self.i += 1
		if self.i % self.printEvery == 0 and self.i > 0:
			self.logProgress()
		
	def logProgress(self, end=""):
		progress = self.file.tell() / self.fileSize if not self.file.closed else 1
		elapsed = time.time() - self.startTime
		remaining = (elapsed / progress - elapsed) if progress > 0 else 0
		timePerRow = elapsed / self.i
		printStr = f"{self.i:,} - {progress:.2%} - elapsed: {formatTime(elapsed)} - remaining: {formatTime(remaining)} - {formatTime(timePerRow)}/row"
		self.maxLineLength = max(self.maxLineLength, len(printStr))
		printStr = printStr.ljust(self.maxLineLength)
		print(f"\r{printStr}", end=end)

		if timePerRow < 20/1000/1000:
			self.printEvery = 20_000
		elif timePerRow < 50/1000/1000:
			self.printEvery = 10_000
		else:
			self.printEvery = 5_000

In [12]:

try:
	import orjson as json
except ImportError:
	import json
	print("Recommended to install 'orjson' for faster JSON parsing")

def getZstFileJsonStream(f: BinaryIO, chunk_size=1024*1024*10) -> Iterator[dict]:
	decompressor = zstandard.ZstdDecompressor(max_window_size=2**31)
	currentString = ""
	def yieldLinesJson():
		nonlocal currentString
		lines = currentString.split("\n")
		currentString = lines[-1]
		for line in lines[:-1]:
			try:
				yield json.loads(line)
			except json.JSONDecodeError:
				print("Error parsing line: " + line)
				traceback.print_exc()
				continue
	zstReader = decompressor.stream_reader(f)
	while True:
		try:
			chunk = zstReader.read(chunk_size)
		except zstandard.ZstdError:
			print("Error reading zst chunk")
			traceback.print_exc()
			break
		if not chunk:
			break
		currentString += chunk.decode("utf-8", "replace")
		
		yield from yieldLinesJson()
	
	yield from yieldLinesJson()
	
	if len(currentString) > 0:
		try:
			yield json.loads(currentString)
		except json.JSONDecodeError:
			print("Error parsing line: " + currentString)
			print(traceback.format_exc())
			pass

def getJsonLinesFileJsonStream(f: BinaryIO) -> Iterator[dict]:
	for line in f:
		line = line.decode("utf-8", errors="replace")
		try:
			yield json.loads(line)
		except json.JSONDecodeError:
			print("Error parsing line: " + line)
			traceback.print_exc()
			continue

def getFileJsonStream(path: str, f: BinaryIO) -> Iterator[dict]|None:
	if path.endswith(".jsonl"):
		return getJsonLinesFileJsonStream(f)
	elif path.endswith(".zst"):
		return getZstFileJsonStream(f)
	else:
		return None

In [13]:

version = sys.version_info
if version.major < 3 or (version.major == 3 and version.minor < 10):
	raise RuntimeError("This script requires Python 3.10 or higher")
import os





recursive = False


def processFile(path: str):
	print(f"Processing file {path}")
	post_data = []
	with open(path, "rb") as f:
		jsonStream = getFileJsonStream(path, f)
		if jsonStream is None:
			print(f"Skipping unknown file {path}")
			return
		progressLog = FileProgressLog(path, f)
		for row in jsonStream:
			progressLog.onRow()
			
			# Permalink, Id, Subreddit, User, Type, Title, Content, Timestamp, NoLikes, NoReplies, ImagesUrls
			
			permalink = row["permalink"]
			id = row["id"]
			subreddit = row["subreddit"]
			user = row["author"]
			type = 'Post'
			title = row["title"]
			content = row["selftext"]
			timestamp = datetime.datetime.fromtimestamp(row.get("created_utc", 0)).strftime('%Y-%m-%d %H:%M:%S')
			score = row["score"]
			replies = row["num_comments"]
			
			images_urls = []
			media_metadata = row.get('media_metadata')
			if isinstance(media_metadata, dict):
				for img in media_metadata.values():
					if isinstance(img, dict):
						if img.get('e') == 'Image':
							s = img.get('s')
							if isinstance(s, dict):
								url = s.get('u')
								if url:
									images_urls.append(url.replace("&amp;", "&"))
						else:
							pass
			elif isinstance(row.get('preview'), dict):
				images = row['preview'].get('images', [])
				for image in images:
					source = image.get('source', {})
					url = source.get('url')
					if url:
						images_urls.append(url.replace("&amp;", "&"))
			elif 'url' in row and row.get('post_hint') == 'image':
				images_urls = [row['url']]

			if not images_urls:
				continue

			post_data.append({
				"Permalink": permalink,
				"Id": id,
				"Subreddit": subreddit,
				"User": user,
				"Type": type,
				"Title": title,
				"Content": content,
				"Timestamp": timestamp,
				"NoLikes": score,
				"NoReplies": replies,
				"ImagesUrls": images_urls,
			})

			# print(f"Link: {permalink} - Id: {id} - r/: {subreddit} - User: {user} - Type: {type} - Title: {title} - Content: {content} - Time: {timestamp} - Score: {score} - Replies: {replies} - ImagesUrls: {images_urls}")

		progressLog.logProgress("\n")
		df = pd.DataFrame(post_data)
		return df
	

def processFolder(path: str):
	fileIterator: Iterable[str]
	if recursive:
		def recursiveFileIterator():
			for root, dirs, files in os.walk(path):
				for file in files:
					yield os.path.join(root, file)
		fileIterator = recursiveFileIterator()
	else:
		fileIterator = os.listdir(path)
		fileIterator = (os.path.join(path, file) for file in fileIterator)
	
	for i, file in enumerate(fileIterator):
		print(f"Processing file {i+1: 3} {file}")
		processFile(file)

if os.path.isdir(fileOrFolderPath):
	processFolder(fileOrFolderPath)
else:
	df = processFile(fileOrFolderPath)

print("Done :>")
df.head()

Processing file data/posts/r_tattoos_posts_all.jsonl
590,470 - 100.00% - elapsed: 00:00:05 - remaining: 0s - 9.1µs/row     
Done :>


Unnamed: 0,Permalink,Id,Subreddit,User,Type,Title,Content,Timestamp,NoLikes,NoReplies,ImagesUrls
0,/r/tattoos/comments/6qs03/alliance_of_professi...,6qs03,tattoos,tat2ts,Post,Alliance of Professional Tattooists,,2008-07-08 16:42:43,1,0,[https://i.redditmedia.com/DDktIBjmhvm2-4R_f17...
1,/r/tattoos/comments/8o8gy/reddit_what_do_you_t...,8o8gy,tattoos,[deleted],Post,"reddit, what do you think of my sleeve so far?",,2009-05-29 16:56:55,13,12,[https://i.redditmedia.com/3KpS4bOrqFgUN0X8N29...
2,/r/tattoos/comments/bq4qj/cross_post_from_ask_...,bq4qj,tattoos,SinAndInk,Post,Cross post from Ask Reddit: What does everyone...,,2010-04-13 07:55:08,4,5,[https://i.redditmedia.com/4xFezp8qybWigpg6WN5...
3,/r/tattoos/comments/bvoa3/gold_rope_chains_bar...,bvoa3,tattoos,dirtyrobot,Post,Gold Rope Chains (Bart Simpson & More),,2010-04-25 01:26:28,8,7,[https://i.redditmedia.com/GSsEy1tJUlwYzkNPWuu...
4,/r/tattoos/comments/c2cff/teufels_mit_dudelsack/,c2cff,tattoos,daddydicklooker,Post,Teufels Mit Dudelsack,,2010-05-11 02:14:29,15,6,[https://i.redditmedia.com/nifywFKrkbmBB9viRh_...


In [14]:
# save data to csv file   
df.to_csv('tattoos_posts.csv', index=False)

In [15]:
df['ImagesUrls'].head()

0    [https://i.redditmedia.com/DDktIBjmhvm2-4R_f17...
1    [https://i.redditmedia.com/3KpS4bOrqFgUN0X8N29...
2    [https://i.redditmedia.com/4xFezp8qybWigpg6WN5...
3    [https://i.redditmedia.com/GSsEy1tJUlwYzkNPWuu...
4    [https://i.redditmedia.com/nifywFKrkbmBB9viRh_...
Name: ImagesUrls, dtype: object

In [16]:
# Convert 'Timestamp' to datetime
df['Timestamp'] = pd.to_datetime(df['Timestamp'])

In [17]:
posts_per_year = df.groupby(df['Timestamp'].dt.year).size()

print("Posts per Year:")
posts_per_year

Posts per Year:


Timestamp
2008        1
2009        1
2010       27
2014        2
2015     8166
2016     9468
2017    13690
2018    16679
2019    20500
2020    16482
2021    18361
2022    14923
2023    27789
2024    34867
dtype: int64

In [18]:
filtered_df = df[df["NoReplies"] >= 10]

posts_per_year = df.groupby(filtered_df['Timestamp'].dt.year).size()

print("Posts per Year:")
posts_per_year

Posts per Year:


Timestamp
2009.0       1
2010.0      12
2014.0       1
2015.0    1726
2016.0    1932
2017.0    2414
2018.0    3296
2019.0    3861
2020.0    3110
2021.0    4083
2022.0    4313
2023.0    3230
2024.0    3725
dtype: int64

---
# Downloading

In [13]:
import logging

output_dir = "data/downloaded_imgs/tattoos_json"
os.makedirs(output_dir, exist_ok=True)
successful_downloads = 0
failed_downloads = 0

logging.basicConfig(
    filename="data/download_log.txt",
    filemode="a",
    format="%(asctime)s - %(levelname)s - %(message)s",
    level=logging.INFO
)

def download_image(url, save_path, max_retries=2, timeout=20):
    global successful_downloads, failed_downloads
    attempt = 0
    while attempt < max_retries:
        try:
            logging.info(f"Attempt {attempt + 1} to download {url}")
            response = requests.get(url, stream=True, timeout=timeout)
            if response.status_code == 200:
                with open(save_path, "wb") as f:
                    for chunk in response.iter_content(1024):
                        f.write(chunk)
                logging.info(f"Successfully downloaded {url} to {save_path}")
                successful_downloads += 1
                return True
            else:
                logging.warning(f"Failed to download {url} (status code: {response.status_code})")
        except requests.exceptions.Timeout:
            logging.warning(f"Timeout occurred while downloading {url}.")
        except requests.exceptions.RequestException as e:
            logging.error(f"Request exception for {url}: {e}")
        except Exception as e:
            logging.error(f"Unexpected error downloading {url}: {e}")
        
        attempt += 1
        if attempt < max_retries:
            logging.info(f"Retrying download for {url} (Attempt {attempt + 1}) after waiting for 5 seconds...")
            time.sleep(5)  # Wait before retrying
        else:
            logging.error(f"Skipping {url} after {max_retries} failed attempts.")
    failed_downloads += 1
    return False

In [14]:
if download_images:
    total_images = sum(len(row['ImagesUrls']) for idx, row in filtered_df.iterrows())
    with tqdm.tqdm(total=total_images, desc="Downloading images") as pbar:
        for idx, row in filtered_df.iterrows():
            
            post_id = row['Id']
            image_urls = row['ImagesUrls']
            
            for i, url in enumerate(image_urls):
                filename = f"{post_id}_row{idx}_img{i}.jpg"
                save_path = os.path.join(output_dir, filename)
                if not os.path.exists(save_path):
                    # Download the image
                    download_image(url, save_path)
                else:
                    logging.info(f"Skipping download for {url} as file already exists.")
                    successful_downloads += 1
                # Update the progress bar and display counts of downloads
                pbar.update(1)
                pbar.set_postfix({
                    "Successful": successful_downloads,
                    "Failed": failed_downloads,
                    "Remaining": total_images - (successful_downloads + failed_downloads)
                })

    print(f"Successfully downloaded: {successful_downloads}")
    print(f"Failed downloads: {failed_downloads}")

Downloading images: 100%|██████████| 40578/40578 [16:37:29<00:00,  1.47s/it, Successful=35844, Failed=4734, Remaining=0]       

Successfully downloaded: 35844
Failed downloads: 4734



